Skip to content

Commit 89e83c8

Browse files
CopilotrkargMsftReubenBond
authored
Align all membership providers' CleanupDefunctSiloEntries to remove non-Active stale entries (#9972)
* Implement CleanupDefunctSiloEntries for development clustering Fix InMemoryMembershipTable.CleanupDefunctSiloEntries to remove all non-Active old entries instead of only Dead entries, consistent with other provider implementations (Cassandra, Azure, etc.). Add unit tests for the fixed behavior in InMemoryMembershipTableTests. Co-authored-by: rkargMsft <164392675+rkargMsft@users.noreply.github.com> Agent-Logs-Url: https://github.com/dotnet/orleans/sessions/28c01159-31b4-4141-841b-f5f1aec83732 * Also fix CleanupDefunctSiloEntries in InProcessMembershipTable Apply the same non-Active predicate to InProcessMembershipTable used by the in-process test cluster, for consistency. Co-authored-by: ReubenBond <203839+ReubenBond@users.noreply.github.com> Agent-Logs-Url: https://github.com/dotnet/orleans/sessions/8e740b40-c62d-4e85-8aac-9c8a56f39bc7 * Align all membership providers: ZooKeeper and Cosmos cleanup non-Active stale entries - CosmosMembershipTable: remove all non-Active stale entries (was only Dead), using proper Math.Max(IAmAliveTime, StartTime) timestamp check - ZooKeeperBasedMembershipTable: implement CleanupDefunctSiloEntries (was NotImplementedException) to delete non-Active stale entries Co-authored-by: rkargMsft <164392675+rkargMsft@users.noreply.github.com> Agent-Logs-Url: https://github.com/dotnet/orleans/sessions/cfa9474b-c8f1-4707-b13e-8ac1054679e7 * Cosmos: push CleanupDefunctSiloEntries filters to the server via LINQ query Instead of fetching all silos and filtering in memory, build a server-side LINQ query that filters by status != Active and both date fields. The Status field is indexed; IAmAliveTime/StartTime are not indexed but the status filter reduces the result set without schema changes. Co-authored-by: rkargMsft <164392675+rkargMsft@users.noreply.github.com> Agent-Logs-Url: https://github.com/dotnet/orleans/sessions/55b130f3-7c14-4eac-bc1d-a808e43da0cb * Cosmos CleanupDefunctSiloEntries: push only Status filter server-side, keep date check in C# The Status field is indexed in Cosmos so filtering non-Active entries server-side is efficient. The date check uses Math.Max(IAmAliveTime, StartTime) semantics in C# to correctly handle cases where IAmAliveTime may be uninitialized (DateTime.MinValue). Co-authored-by: rkargMsft <164392675+rkargMsft@users.noreply.github.com> Agent-Logs-Url: https://github.com/dotnet/orleans/sessions/06efe6ea-ad61-43fd-ba3e-9e4f08a8752f * Simplify date comparison: compare max ticks directly instead of creating new DateTime Co-authored-by: rkargMsft <164392675+rkargMsft@users.noreply.github.com> Agent-Logs-Url: https://github.com/dotnet/orleans/sessions/0d84e85c-6909-4b6e-8d16-0f2e6782c645 --------- Co-authored-by: copilot-swe-agent[bot] <198982749+Copilot@users.noreply.github.com> Co-authored-by: rkargMsft <164392675+rkargMsft@users.noreply.github.com> Co-authored-by: ReubenBond <203839+ReubenBond@users.noreply.github.com>
1 parent 6daf6ec commit 89e83c8

File tree

5 files changed

+158
-9
lines changed

5 files changed

+158
-9
lines changed

src/Azure/Orleans.Clustering.Cosmos/Membership/CosmosMembershipTable.cs

Lines changed: 19 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -103,7 +103,25 @@ public async Task CleanupDefunctSiloEntries(DateTimeOffset beforeDate)
103103
{
104104
try
105105
{
106-
var silos = (await ReadSilos(SiloStatus.Dead).ConfigureAwait(false)).Where(s => s.IAmAliveTime < beforeDate).ToList();
106+
// Filter by status server-side (Status is indexed); apply the date check in C#
107+
// so that the Math.Max(IAmAliveTime, StartTime) semantics are preserved correctly.
108+
var activeStatus = (int)SiloStatus.Active;
109+
var query = _container
110+
.GetItemLinqQueryable<SiloEntity>(requestOptions: _queryRequestOptions)
111+
.Where(g => g.EntityType == nameof(SiloEntity) && g.Status != activeStatus);
112+
113+
var iterator = query.ToFeedIterator();
114+
var nonActiveSilos = new List<SiloEntity>();
115+
do
116+
{
117+
var items = await iterator.ReadNextAsync().ConfigureAwait(false);
118+
nonActiveSilos.AddRange(items);
119+
} while (iterator.HasMoreResults);
120+
121+
var silos = nonActiveSilos
122+
.Where(s => Math.Max(s.IAmAliveTime.Ticks, s.StartTime.Ticks) < beforeDate.Ticks)
123+
.ToList();
124+
107125
if (silos.Count == 0)
108126
{
109127
return;

src/Orleans.Clustering.ZooKeeper/ZooKeeperBasedMembershipTable.cs

Lines changed: 17 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -336,7 +336,23 @@ private static T Deserialize<T>(byte[] data)
336336

337337
public Task CleanupDefunctSiloEntries(DateTimeOffset beforeDate)
338338
{
339-
throw new NotImplementedException();
339+
return UsingZookeeper(async zk =>
340+
{
341+
var childrenResult = await zk.getChildrenAsync("/");
342+
var rows = await Task.WhenAll(
343+
childrenResult.Children.Select(child => GetRow(zk, SiloAddress.FromParsableString(child))));
344+
345+
foreach (var (entry, _) in rows)
346+
{
347+
if (entry.Status != SiloStatus.Active
348+
&& Math.Max(entry.IAmAliveTime.Ticks, entry.StartTime.Ticks) < beforeDate.Ticks)
349+
{
350+
await ZKUtil.deleteRecursiveAsync(zk, ConvertToRowPath(entry.SiloAddress));
351+
}
352+
}
353+
354+
return true;
355+
}, this.deploymentConnectionString, this.watcher);
340356
}
341357

342358
[LoggerMessage(

src/Orleans.Runtime/MembershipService/InMemoryMembershipTable.cs

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -87,17 +87,17 @@ private string NewETag()
8787

8888
public void CleanupDefunctSiloEntries(DateTimeOffset beforeDate)
8989
{
90-
var removedEnties = new List<SiloAddress>();
91-
foreach (var (key, (value, etag)) in siloTable)
90+
var removedEntries = new List<SiloAddress>();
91+
foreach (var (key, (value, _)) in siloTable)
9292
{
93-
if (value.Status == SiloStatus.Dead
93+
if (value.Status != SiloStatus.Active
9494
&& new DateTime(Math.Max(value.IAmAliveTime.Ticks, value.StartTime.Ticks), DateTimeKind.Utc) < beforeDate)
9595
{
96-
removedEnties.Add(key);
96+
removedEntries.Add(key);
9797
}
9898
}
9999

100-
foreach (var removedEntry in removedEnties)
100+
foreach (var removedEntry in removedEntries)
101101
{
102102
siloTable.Remove(removedEntry);
103103
}

src/Orleans.TestingHost/InProcess/InProcessMembershipTable.cs

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -173,11 +173,10 @@ public void CleanupDefunctSiloEntries(DateTimeOffset beforeDate)
173173
var entries = _table.Values.ToList();
174174
foreach (var (entry, _) in entries)
175175
{
176-
if (entry.Status == SiloStatus.Dead
176+
if (entry.Status != SiloStatus.Active
177177
&& new DateTime(Math.Max(entry.IAmAliveTime.Ticks, entry.StartTime.Ticks), DateTimeKind.Utc) < beforeDate)
178178
{
179179
_table.Remove(entry.SiloAddress, out _);
180-
continue;
181180
}
182181
}
183182
}
Lines changed: 116 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,116 @@
1+
using System.Net;
2+
using Microsoft.Extensions.DependencyInjection;
3+
using Orleans.Runtime;
4+
using Orleans.Runtime.MembershipService;
5+
using Orleans.Serialization;
6+
using Xunit;
7+
8+
namespace UnitTests.MembershipTests
9+
{
10+
/// <summary>
11+
/// Tests for the in-memory membership table used by development clustering.
12+
/// </summary>
13+
[TestCategory("BVT"), TestCategory("Membership")]
14+
public class InMemoryMembershipTableTests
15+
{
16+
private readonly InMemoryMembershipTable table;
17+
18+
public InMemoryMembershipTableTests()
19+
{
20+
var services = new ServiceCollection();
21+
services.AddSerializer();
22+
var serviceProvider = services.BuildServiceProvider();
23+
var deepCopier = serviceProvider.GetRequiredService<DeepCopier>();
24+
table = new InMemoryMembershipTable(deepCopier);
25+
}
26+
27+
[Fact]
28+
public void CleanupDefunctSiloEntries_RemovesNonActiveOldEntries()
29+
{
30+
var tableVersion = table.ReadTableVersion();
31+
32+
// Add old dead entry (should be removed)
33+
var deadEntry = CreateEntry(SiloStatus.Dead, daysOld: 10);
34+
table.Insert(deadEntry, tableVersion);
35+
tableVersion = table.ReadTableVersion();
36+
37+
// Add old joining entry (should be removed)
38+
var joiningEntry = CreateEntry(SiloStatus.Joining, daysOld: 10);
39+
table.Insert(joiningEntry, tableVersion);
40+
tableVersion = table.ReadTableVersion();
41+
42+
// Add old active entry (should NOT be removed)
43+
var activeEntry = CreateEntry(SiloStatus.Active, daysOld: 10);
44+
table.Insert(activeEntry, tableVersion);
45+
tableVersion = table.ReadTableVersion();
46+
47+
// Add new entry with current timestamp (should NOT be removed regardless of status)
48+
var newEntry = CreateEntry(SiloStatus.Dead, daysOld: 0);
49+
table.Insert(newEntry, tableVersion);
50+
51+
var cutoff = DateTimeOffset.UtcNow.AddDays(-5);
52+
table.CleanupDefunctSiloEntries(cutoff);
53+
54+
var data = table.ReadAll();
55+
Assert.Equal(2, data.Members.Count);
56+
Assert.Contains(data.Members, m => m.Item1.SiloAddress.Equals(activeEntry.SiloAddress));
57+
Assert.Contains(data.Members, m => m.Item1.SiloAddress.Equals(newEntry.SiloAddress));
58+
Assert.DoesNotContain(data.Members, m => m.Item1.SiloAddress.Equals(deadEntry.SiloAddress));
59+
Assert.DoesNotContain(data.Members, m => m.Item1.SiloAddress.Equals(joiningEntry.SiloAddress));
60+
}
61+
62+
[Fact]
63+
public void CleanupDefunctSiloEntries_RemovesAllNonActiveStatuses()
64+
{
65+
foreach (var status in Enum.GetValues<SiloStatus>())
66+
{
67+
if (status == SiloStatus.Active)
68+
{
69+
continue;
70+
}
71+
72+
var entry = CreateEntry(status, daysOld: 10);
73+
table.Insert(entry, table.ReadTableVersion());
74+
}
75+
76+
var cutoff = DateTimeOffset.UtcNow.AddDays(-5);
77+
table.CleanupDefunctSiloEntries(cutoff);
78+
79+
var data = table.ReadAll();
80+
Assert.Empty(data.Members);
81+
}
82+
83+
[Fact]
84+
public void CleanupDefunctSiloEntries_PreservesActiveEntries()
85+
{
86+
var tableVersion = table.ReadTableVersion();
87+
var activeEntry = CreateEntry(SiloStatus.Active, daysOld: 30);
88+
table.Insert(activeEntry, tableVersion);
89+
90+
var cutoff = DateTimeOffset.UtcNow.AddDays(-5);
91+
table.CleanupDefunctSiloEntries(cutoff);
92+
93+
var data = table.ReadAll();
94+
Assert.Single(data.Members);
95+
Assert.Equal(activeEntry.SiloAddress, data.Members[0].Item1.SiloAddress);
96+
}
97+
98+
private static int _portCounter = 10000;
99+
100+
private static MembershipEntry CreateEntry(SiloStatus status, int daysOld)
101+
{
102+
var port = Interlocked.Increment(ref _portCounter);
103+
var siloAddress = SiloAddress.New(new IPEndPoint(IPAddress.Loopback, port), 0);
104+
var now = DateTime.UtcNow.AddDays(-daysOld);
105+
return new MembershipEntry
106+
{
107+
SiloAddress = siloAddress,
108+
HostName = "localhost",
109+
SiloName = $"TestSilo-{port}",
110+
Status = status,
111+
StartTime = now,
112+
IAmAliveTime = now,
113+
};
114+
}
115+
}
116+
}

0 commit comments

Comments
 (0)