Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,25 @@ public async Task CleanupDefunctSiloEntries(DateTimeOffset beforeDate)
{
try
{
var silos = (await ReadSilos(SiloStatus.Dead).ConfigureAwait(false)).Where(s => s.IAmAliveTime < beforeDate).ToList();
// Filter by status server-side (Status is indexed); apply the date check in C#
// so that the Math.Max(IAmAliveTime, StartTime) semantics are preserved correctly.
var activeStatus = (int)SiloStatus.Active;
var query = _container
.GetItemLinqQueryable<SiloEntity>(requestOptions: _queryRequestOptions)
.Where(g => g.EntityType == nameof(SiloEntity) && g.Status != activeStatus);

var iterator = query.ToFeedIterator();
var nonActiveSilos = new List<SiloEntity>();
do
{
var items = await iterator.ReadNextAsync().ConfigureAwait(false);
nonActiveSilos.AddRange(items);
} while (iterator.HasMoreResults);

var silos = nonActiveSilos
.Where(s => Math.Max(s.IAmAliveTime.Ticks, s.StartTime.Ticks) < beforeDate.Ticks)
.ToList();

if (silos.Count == 0)
{
return;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -336,7 +336,23 @@ private static T Deserialize<T>(byte[] data)

public Task CleanupDefunctSiloEntries(DateTimeOffset beforeDate)
{
throw new NotImplementedException();
return UsingZookeeper(async zk =>
{
var childrenResult = await zk.getChildrenAsync("/");
var rows = await Task.WhenAll(
childrenResult.Children.Select(child => GetRow(zk, SiloAddress.FromParsableString(child))));

foreach (var (entry, _) in rows)
{
if (entry.Status != SiloStatus.Active
&& Math.Max(entry.IAmAliveTime.Ticks, entry.StartTime.Ticks) < beforeDate.Ticks)
{
await ZKUtil.deleteRecursiveAsync(zk, ConvertToRowPath(entry.SiloAddress));
}
}

return true;
}, this.deploymentConnectionString, this.watcher);
}

[LoggerMessage(
Expand Down
10 changes: 5 additions & 5 deletions src/Orleans.Runtime/MembershipService/InMemoryMembershipTable.cs
Original file line number Diff line number Diff line change
Expand Up @@ -87,17 +87,17 @@ private string NewETag()

public void CleanupDefunctSiloEntries(DateTimeOffset beforeDate)
{
var removedEnties = new List<SiloAddress>();
foreach (var (key, (value, etag)) in siloTable)
var removedEntries = new List<SiloAddress>();
foreach (var (key, (value, _)) in siloTable)
{
if (value.Status == SiloStatus.Dead
if (value.Status != SiloStatus.Active
&& new DateTime(Math.Max(value.IAmAliveTime.Ticks, value.StartTime.Ticks), DateTimeKind.Utc) < beforeDate)
{
removedEnties.Add(key);
removedEntries.Add(key);
}
}

foreach (var removedEntry in removedEnties)
foreach (var removedEntry in removedEntries)
{
siloTable.Remove(removedEntry);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -173,11 +173,10 @@ public void CleanupDefunctSiloEntries(DateTimeOffset beforeDate)
var entries = _table.Values.ToList();
foreach (var (entry, _) in entries)
{
if (entry.Status == SiloStatus.Dead
if (entry.Status != SiloStatus.Active
&& new DateTime(Math.Max(entry.IAmAliveTime.Ticks, entry.StartTime.Ticks), DateTimeKind.Utc) < beforeDate)
{
_table.Remove(entry.SiloAddress, out _);
continue;
}
}
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,116 @@
using System.Net;
using Microsoft.Extensions.DependencyInjection;
using Orleans.Runtime;
using Orleans.Runtime.MembershipService;
using Orleans.Serialization;
using Xunit;

namespace UnitTests.MembershipTests
{
/// <summary>
/// Tests for the in-memory membership table used by development clustering.
/// </summary>
[TestCategory("BVT"), TestCategory("Membership")]
public class InMemoryMembershipTableTests
{
private readonly InMemoryMembershipTable table;

public InMemoryMembershipTableTests()
{
var services = new ServiceCollection();
services.AddSerializer();
var serviceProvider = services.BuildServiceProvider();
var deepCopier = serviceProvider.GetRequiredService<DeepCopier>();
table = new InMemoryMembershipTable(deepCopier);
}

[Fact]
public void CleanupDefunctSiloEntries_RemovesNonActiveOldEntries()
{
var tableVersion = table.ReadTableVersion();

// Add old dead entry (should be removed)
var deadEntry = CreateEntry(SiloStatus.Dead, daysOld: 10);
table.Insert(deadEntry, tableVersion);
tableVersion = table.ReadTableVersion();

// Add old joining entry (should be removed)
var joiningEntry = CreateEntry(SiloStatus.Joining, daysOld: 10);
table.Insert(joiningEntry, tableVersion);
tableVersion = table.ReadTableVersion();

// Add old active entry (should NOT be removed)
var activeEntry = CreateEntry(SiloStatus.Active, daysOld: 10);
table.Insert(activeEntry, tableVersion);
tableVersion = table.ReadTableVersion();

// Add new entry with current timestamp (should NOT be removed regardless of status)
var newEntry = CreateEntry(SiloStatus.Dead, daysOld: 0);
table.Insert(newEntry, tableVersion);

var cutoff = DateTimeOffset.UtcNow.AddDays(-5);
table.CleanupDefunctSiloEntries(cutoff);

var data = table.ReadAll();
Assert.Equal(2, data.Members.Count);
Assert.Contains(data.Members, m => m.Item1.SiloAddress.Equals(activeEntry.SiloAddress));
Assert.Contains(data.Members, m => m.Item1.SiloAddress.Equals(newEntry.SiloAddress));
Assert.DoesNotContain(data.Members, m => m.Item1.SiloAddress.Equals(deadEntry.SiloAddress));
Assert.DoesNotContain(data.Members, m => m.Item1.SiloAddress.Equals(joiningEntry.SiloAddress));
}

[Fact]
public void CleanupDefunctSiloEntries_RemovesAllNonActiveStatuses()
{
foreach (var status in Enum.GetValues<SiloStatus>())
{
if (status == SiloStatus.Active)
{
continue;
}

var entry = CreateEntry(status, daysOld: 10);
table.Insert(entry, table.ReadTableVersion());
}

var cutoff = DateTimeOffset.UtcNow.AddDays(-5);
table.CleanupDefunctSiloEntries(cutoff);

var data = table.ReadAll();
Assert.Empty(data.Members);
}

[Fact]
public void CleanupDefunctSiloEntries_PreservesActiveEntries()
{
var tableVersion = table.ReadTableVersion();
var activeEntry = CreateEntry(SiloStatus.Active, daysOld: 30);
table.Insert(activeEntry, tableVersion);

var cutoff = DateTimeOffset.UtcNow.AddDays(-5);
table.CleanupDefunctSiloEntries(cutoff);

var data = table.ReadAll();
Assert.Single(data.Members);
Assert.Equal(activeEntry.SiloAddress, data.Members[0].Item1.SiloAddress);
}

private static int _portCounter = 10000;

private static MembershipEntry CreateEntry(SiloStatus status, int daysOld)
{
var port = Interlocked.Increment(ref _portCounter);
var siloAddress = SiloAddress.New(new IPEndPoint(IPAddress.Loopback, port), 0);
var now = DateTime.UtcNow.AddDays(-daysOld);
return new MembershipEntry
{
SiloAddress = siloAddress,
HostName = "localhost",
SiloName = $"TestSilo-{port}",
Status = status,
StartTime = now,
IAmAliveTime = now,
};
}
}
}
Loading