diff --git a/src/Orleans.Runtime/Configuration/Options/GrainDirectoryOptions.cs b/src/Orleans.Runtime/Configuration/Options/GrainDirectoryOptions.cs index f3509a3df3..bec12f79b6 100644 --- a/src/Orleans.Runtime/Configuration/Options/GrainDirectoryOptions.cs +++ b/src/Orleans.Runtime/Configuration/Options/GrainDirectoryOptions.cs @@ -1,99 +1,117 @@ - -using System; using Orleans.Runtime.GrainDirectory; -namespace Orleans.Configuration +namespace Orleans.Configuration; + +public class GrainDirectoryOptions { - public class GrainDirectoryOptions + /// + /// Configuration type that controls the type of the grain directory caching algorithm that silo use. + /// + public enum CachingStrategyType { - /// - /// Configuration type that controls the type of the grain directory caching algorithm that silo use. - /// - public enum CachingStrategyType - { - /// Don't cache. - None, - /// Standard fixed-size LRU. - LRU, - /// Adaptive caching with fixed maximum size and refresh. This option should be used in production. - [Obsolete("Adaptive caching is deprecated in favor of LRU and will be removed in a future version. This value is now an alias for LRU.")] - Adaptive, - /// Custom cache implementation, configured by registering an implementation in the dependency injection container. - Custom - } + /// Don't cache. + None, + /// Standard fixed-size LRU. + LRU, + /// Adaptive caching with fixed maximum size and refresh. This option should be used in production. + [Obsolete("Adaptive caching is deprecated in favor of LRU and will be removed in a future version. This value is now an alias for LRU.")] + Adaptive, + /// Custom cache implementation, configured by registering an implementation in the dependency injection container. + Custom + } - /// - /// Gets or sets the caching strategy to use. - /// The options are None, which means don't cache directory entries locally; - /// LRU, which indicates that a standard fixed-size least recently used strategy should be used; and - /// Adaptive, which indicates that an adaptive strategy with a fixed maximum size should be used. - /// The LRU strategy is used by default. - /// - public CachingStrategyType CachingStrategy { get; set; } = DEFAULT_CACHING_STRATEGY; + /// + /// Gets or sets the caching strategy to use. + /// The options are None, which means don't cache directory entries locally; + /// LRU, which indicates that a standard fixed-size least recently used strategy should be used; and + /// Adaptive, which indicates that an adaptive strategy with a fixed maximum size should be used. + /// The LRU strategy is used by default. + /// + public CachingStrategyType CachingStrategy { get; set; } = DEFAULT_CACHING_STRATEGY; - /// - /// The default value for . - /// - public const CachingStrategyType DEFAULT_CACHING_STRATEGY = CachingStrategyType.LRU; + /// + /// The default value for . + /// + public const CachingStrategyType DEFAULT_CACHING_STRATEGY = CachingStrategyType.LRU; - /// - /// Gets or sets the maximum number of grains to cache directory information for. - /// - public int CacheSize { get; set; } = DEFAULT_CACHE_SIZE; + /// + /// Gets or sets the maximum number of grains to cache directory information for. + /// + public int CacheSize { get; set; } = DEFAULT_CACHE_SIZE; - /// - /// The default value for . - /// - public const int DEFAULT_CACHE_SIZE = 1_000_000; + /// + /// The default value for . + /// + public const int DEFAULT_CACHE_SIZE = 1_000_000; - /// - /// Gets or sets the initial (minimum) time, in seconds, to keep a cache entry before revalidating. - /// - [Obsolete("InitialCacheTTL is deprecated and will be removed in a future version.")] - public TimeSpan InitialCacheTTL { get; set; } = DEFAULT_INITIAL_CACHE_TTL; + /// + /// Gets or sets the initial (minimum) time, in seconds, to keep a cache entry before revalidating. + /// + [Obsolete("InitialCacheTTL is deprecated and will be removed in a future version.")] + public TimeSpan InitialCacheTTL { get; set; } = DEFAULT_INITIAL_CACHE_TTL; - /// - /// The default value for . - /// - [Obsolete("DEFAULT_INITIAL_CACHE_TTL is deprecated and will be removed in a future version.")] - public static readonly TimeSpan DEFAULT_INITIAL_CACHE_TTL = TimeSpan.FromSeconds(30); + /// + /// The default value for . + /// + [Obsolete("DEFAULT_INITIAL_CACHE_TTL is deprecated and will be removed in a future version.")] + public static readonly TimeSpan DEFAULT_INITIAL_CACHE_TTL = TimeSpan.FromSeconds(30); - /// - /// Gets or sets the maximum time, in seconds, to keep a cache entry before revalidating. - /// - [Obsolete("MaximumCacheTTL is deprecated and will be removed in a future version.")] - public TimeSpan MaximumCacheTTL { get; set; } = DEFAULT_MAXIMUM_CACHE_TTL; + /// + /// Gets or sets the maximum time, in seconds, to keep a cache entry before revalidating. + /// + [Obsolete("MaximumCacheTTL is deprecated and will be removed in a future version.")] + public TimeSpan MaximumCacheTTL { get; set; } = DEFAULT_MAXIMUM_CACHE_TTL; - /// - /// The default value for . - /// - [Obsolete("DEFAULT_MAXIMUM_CACHE_TTL is deprecated and will be removed in a future version.")] - public static readonly TimeSpan DEFAULT_MAXIMUM_CACHE_TTL = TimeSpan.FromSeconds(240); + /// + /// The default value for . + /// + [Obsolete("DEFAULT_MAXIMUM_CACHE_TTL is deprecated and will be removed in a future version.")] + public static readonly TimeSpan DEFAULT_MAXIMUM_CACHE_TTL = TimeSpan.FromSeconds(240); - /// - /// Gets or sets the factor by which cache entry TTLs should be extended when they are found to be stable. - /// - [Obsolete("CacheTTLExtensionFactor is deprecated and will be removed in a future version.")] - public double CacheTTLExtensionFactor { get; set; } = DEFAULT_TTL_EXTENSION_FACTOR; + /// + /// Gets or sets the factor by which cache entry TTLs should be extended when they are found to be stable. + /// + [Obsolete("CacheTTLExtensionFactor is deprecated and will be removed in a future version.")] + public double CacheTTLExtensionFactor { get; set; } = DEFAULT_TTL_EXTENSION_FACTOR; - /// - /// The default value for . - /// - [Obsolete("DEFAULT_TTL_EXTENSION_FACTOR is deprecated and will be removed in a future version.")] - public const double DEFAULT_TTL_EXTENSION_FACTOR = 2.0; + /// + /// The default value for . + /// + [Obsolete("DEFAULT_TTL_EXTENSION_FACTOR is deprecated and will be removed in a future version.")] + public const double DEFAULT_TTL_EXTENSION_FACTOR = 2.0; - /// - /// Gets or sets the time span between when we have added an entry for an activation to the grain directory and when we are allowed - /// to conditionally remove that entry. - /// Conditional deregistration is used for lazy clean-up of activations whose prompt deregistration failed for some reason (e.g., message failure). - /// This should always be at least one minute, since we compare the times on the directory partition, so message delays and clcks skues have - /// to be allowed. - /// - public TimeSpan LazyDeregistrationDelay { get; set; } = DEFAULT_UNREGISTER_RACE_DELAY; + /// + /// Gets or sets the time span between when we have added an entry for an activation to the grain directory and when we are allowed + /// to conditionally remove that entry. + /// Conditional deregistration is used for lazy clean-up of activations whose prompt deregistration failed for some reason (e.g., message failure). + /// This should always be at least one minute, since we compare the times on the directory partition, so message delays and clcks skues have + /// to be allowed. + /// + public TimeSpan LazyDeregistrationDelay { get; set; } = DEFAULT_UNREGISTER_RACE_DELAY; - /// - /// The default value for . - /// - public static readonly TimeSpan DEFAULT_UNREGISTER_RACE_DELAY = TimeSpan.FromMinutes(1); - } + /// + /// The default value for . + /// + public static readonly TimeSpan DEFAULT_UNREGISTER_RACE_DELAY = TimeSpan.FromMinutes(1); + + /// + /// Gets or sets the duration for the safety lease hold applied after an ungraceful silo failure. + /// This duration applies in two scenarios: + /// + /// When a specific silo crashes ungracefully, grain lease holds prevent individual re-registration of its grains for this duration. + /// When a directory partition can not acquire a snapshot from a previous owner, range lease holds prevent new registrations in that whole range for this duration. + /// + /// + /// + /// Depending on the value of this, the duration is understood as: + /// + /// SafetyLeaseHoldDuration > TimeSpan.Zero - The lease duration is explicitly controlled by the user. + /// SafetyLeaseHoldDuration = TimeSpan.Zero. No leases are placed at all, effectively nullifying this safety option. + /// SafetyLeaseHoldDuration = null - The system computes a lease duration as: + /// 2 × × . + /// This is the default value, and is designed to be long enough to allow for failure detection and cluster stabilization. + /// + /// + /// + public TimeSpan? SafetyLeaseHoldDuration { get; set; } } diff --git a/src/Orleans.Runtime/GrainDirectory/DirectoryResult.cs b/src/Orleans.Runtime/GrainDirectory/DirectoryResult.cs index 1fbc4d9c61..5ba81c9a26 100644 --- a/src/Orleans.Runtime/GrainDirectory/DirectoryResult.cs +++ b/src/Orleans.Runtime/GrainDirectory/DirectoryResult.cs @@ -4,22 +4,40 @@ namespace Orleans.Runtime; internal static class DirectoryResult { - public static DirectoryResult FromResult(T result, MembershipVersion version) => new DirectoryResult(result, version); - public static DirectoryResult RefreshRequired(MembershipVersion version) => new DirectoryResult(default, version); + public static DirectoryResult FromResult(T result, MembershipVersion version) => new(result, version); + public static DirectoryResult RefreshRequired(MembershipVersion version) => new(default, version); + public static DirectoryResult RetryAfter(TimeSpan retryAfter) => new(retryAfter); } [GenerateSerializer, Alias("DirectoryResult`1"), Immutable] -internal readonly struct DirectoryResult(T? result, MembershipVersion version) +internal readonly struct DirectoryResult { [Id(0)] - private readonly T? _result = result; + private readonly T? _result; [Id(1)] - public readonly MembershipVersion Version = version; + public readonly MembershipVersion Version; + + /// + /// When greater than , indicates that the caller should retry after the specified delay. + /// + [Id(2)] + public readonly TimeSpan RetryAfterDelay; + + public DirectoryResult(T? result, MembershipVersion version) + { + _result = result; + Version = version; + } + + public DirectoryResult(TimeSpan retryAfter) + { + RetryAfterDelay = retryAfter; + } public bool TryGetResult(MembershipVersion version, [NotNullWhen(true)] out T? result) { - if (Version == version) + if (RetryAfterDelay <= TimeSpan.Zero && Version == version) { result = _result!; return true; diff --git a/src/Orleans.Runtime/GrainDirectory/DistributedGrainDirectory.cs b/src/Orleans.Runtime/GrainDirectory/DistributedGrainDirectory.cs index c6b8074f06..797bd44e6d 100644 --- a/src/Orleans.Runtime/GrainDirectory/DistributedGrainDirectory.cs +++ b/src/Orleans.Runtime/GrainDirectory/DistributedGrainDirectory.cs @@ -1,13 +1,12 @@ -using System; -using System.Collections.Generic; using System.Collections.Immutable; -using System.Linq; +using System.Diagnostics; using System.Runtime.CompilerServices; -using System.Threading; -using System.Threading.Tasks; +using System.Timers; using Microsoft.Extensions.DependencyInjection; using Microsoft.Extensions.Logging; +using Microsoft.Extensions.Options; using Orleans.Concurrency; +using Orleans.Configuration; using Orleans.GrainDirectory; using Orleans.Internal; using Orleans.Runtime.Internal; @@ -63,6 +62,8 @@ internal sealed partial class DistributedGrainDirectory : SystemTarget, IGrainDi private readonly IServiceProvider _serviceProvider; private readonly ImmutableArray _partitions; private readonly CancellationTokenSource _stoppedCts = new(); + private readonly TimeSpan _leaseHoldDuration; + private readonly TimeProvider _timeProvider; internal CancellationToken OnStoppedToken => _stoppedCts.Token; internal ClusterMembershipSnapshot ClusterMembershipSnapshot => _membershipService.CurrentView.ClusterMembershipSnapshot; @@ -75,25 +76,39 @@ internal sealed partial class DistributedGrainDirectory : SystemTarget, IGrainDi // precise by also tracking the sets of ranges which need to be recovered, but that complicates things somewhat since it would require tracking the ranges // for each recovery version. private long _recoveryMembershipVersion; - private Task _runTask = Task.CompletedTask; - private ActivationDirectory _localActivations; + private readonly ActivationDirectory _localActivations; private GrainDirectoryResolver? _grainDirectoryResolver; + private Task? _runTask; + private Task? _leaseCleanupTask; public DistributedGrainDirectory( DirectoryMembershipService membershipService, ILogger logger, IServiceProvider serviceProvider, IInternalGrainFactory grainFactory, + IOptions directoryOptions, + IOptions membershipOptions, + TimeProvider timeProvider, SystemTargetShared shared) : base(Constants.GrainDirectoryType, shared) { _localActivations = shared.ActivationDirectory; _serviceProvider = serviceProvider; _membershipService = membershipService; _logger = logger; + _timeProvider = timeProvider; + + _leaseHoldDuration = directoryOptions.Value.SafetyLeaseHoldDuration switch + { + null => 2 * membershipOptions.Value.ProbeTimeout * membershipOptions.Value.NumMissedProbesLimit, + TimeSpan duration when duration >= TimeSpan.Zero => duration, + _ => throw new InvalidOperationException("Lease hold duration must be non-negative.") + }; + var partitions = ImmutableArray.CreateBuilder(DirectoryMembershipSnapshot.PartitionsPerSilo); + for (var i = 0; i < DirectoryMembershipSnapshot.PartitionsPerSilo; i++) { - partitions.Add(new GrainDirectoryPartition(i, this, grainFactory, shared)); + partitions.Add(new GrainDirectoryPartition(i, this, _leaseHoldDuration, grainFactory, timeProvider, shared)); } _partitions = partitions.ToImmutable(); @@ -176,6 +191,14 @@ private async Task InvokeAsync( continue; } + if (invokeResult.RetryAfterDelay > TimeSpan.Zero) + { + // A safety lease hold is active for this grain or range. + // Wait for the suggested duration before retrying. + await Task.Delay(invokeResult.RetryAfterDelay, _timeProvider, cancellationToken); + continue; + } + if (initialRecoveryMembershipVersion != _recoveryMembershipVersion) { // If the recovery version changed, perform a view refresh and re-issue the operation. @@ -312,7 +335,16 @@ void ILifecycleParticipant.Participate(ISiloLifecycle observer) Task OnRuntimeInitializeStart(CancellationToken cancellationToken) { using var _ = new ExecutionContextSuppressor(); - WorkItemGroup.QueueAction(() => _runTask = ProcessMembershipUpdates()); + + WorkItemGroup.QueueAction(() => + { + _runTask = ProcessMembershipUpdates(); + + if (_leaseHoldDuration > TimeSpan.Zero) + { + _leaseCleanupTask = RequestExpiredLeaseCleanups(); + } + }); return Task.CompletedTask; } @@ -320,11 +352,14 @@ Task OnRuntimeInitializeStart(CancellationToken cancellationToken) async Task OnRuntimeInitializeStop(CancellationToken cancellationToken) { _stoppedCts.Cancel(); + if (_runTask is { } task) { // Try to wait for hand-off to complete. await this.RunOrQueueTask(async () => await task.WaitAsync(cancellationToken).SuppressThrowing()); } + + // No need to wait on the cleanup task since it does not have any external effects. } async Task OnShuttingDown(CancellationToken token) @@ -359,9 +394,10 @@ private async Task ProcessMembershipUpdates() { if (change.Status == SiloStatus.Dead) { + var previousStatus = previousUpdate.GetSiloStatus(change.SiloAddress); foreach (var partition in _partitions) { - tasks.Add(partition.OnSiloRemovedFromClusterAsync(change)); + tasks.Add(partition.OnSiloRemovedFromClusterAsync(change, previousStatus)); } } } @@ -396,6 +432,38 @@ private async Task ProcessMembershipUpdates() await Task.WhenAll(tasks).SuppressThrowing(); } + private async Task RequestExpiredLeaseCleanups() + { + Debug.Assert(_leaseHoldDuration > TimeSpan.Zero); + + // We request cleanups periodically to not let expired leases linger in the directory for too long. + // We do it here as opposed to in the partitions to avoid having 30 (by default, maybe more) timers. + var period = 1.1 * _leaseHoldDuration; + + if (period < TimeSpan.FromMinutes(1)) + { + // We create a lower-bound to avoid creating too much overhead in the partitions. + period = TimeSpan.FromMinutes(1); + } + + using var timer = new PeriodicTimer(period); + + try + { + while (await timer.WaitForNextTickAsync(_stoppedCts.Token)) + { + foreach (var partition in _partitions) + { + partition.CleanupExpiredLeases(); + } + } + } + catch (OperationCanceledException) when (_stoppedCts.IsCancellationRequested) + { + // Ignore + } + } + SiloAddress? ITestHooks.GetPrimaryForGrain(GrainId grainId) { _membershipService.CurrentView.TryGetOwner(grainId, out var owner, out _); diff --git a/src/Orleans.Runtime/GrainDirectory/GrainDirectoryPartition.Interface.cs b/src/Orleans.Runtime/GrainDirectory/GrainDirectoryPartition.Interface.cs index 9806995a88..d43d26c386 100644 --- a/src/Orleans.Runtime/GrainDirectory/GrainDirectoryPartition.Interface.cs +++ b/src/Orleans.Runtime/GrainDirectory/GrainDirectoryPartition.Interface.cs @@ -1,7 +1,5 @@ -using System; using System.Diagnostics; using System.Runtime.InteropServices; -using System.Threading.Tasks; using Microsoft.Extensions.Logging; namespace Orleans.Runtime.GrainDirectory; @@ -21,6 +19,47 @@ async ValueTask> IGrainDirectoryPartition.Register } DebugAssertOwnership(address.GrainId); + + if (_leaseHoldDuration > TimeSpan.Zero) + { + var utcNow = _timeProvider.GetUtcNow().UtcDateTime; + var rangeHash = address.GrainId.GetUniformHashCode(); + + // Range lease holds + for (var i = _rangeLeaseHolds.Count - 1; i >= 0; i--) + { + var (lockedRange, expiration) = _rangeLeaseHolds[i]; + + if (utcNow >= expiration) + { + // We use this opportunity to cleanup this expired range lease hold. + _rangeLeaseHolds.RemoveAt(i); + continue; + } + + // If it is still active, does it block this request? + if (lockedRange.Contains(rangeHash)) + { + return DirectoryResult.RetryAfter(expiration - utcNow); + } + } + + // Grain lease holds + if (_directory.TryGetValue(address.GrainId, out var existingActivation)) + { + if (_siloLeaseHolds.TryGetValue(existingActivation.SiloAddress!, out var expiration) && utcNow < expiration) + { + // This grain belongs to this partition, and the activation is sitting on a silo that has an active lease hold. + // We need to check if the request includes the previous activation id, and if it does it's a valid update/override, + // otherwise it's a new activation trying to "steal" the id while the lease is active, so we reject it! + if (currentRegistration is null || !existingActivation.Matches(currentRegistration)) + { + return DirectoryResult.RetryAfter(expiration - utcNow); + } + } + } + } + return DirectoryResult.FromResult(RegisterCore(address, currentRegistration), version); } diff --git a/src/Orleans.Runtime/GrainDirectory/GrainDirectoryPartition.cs b/src/Orleans.Runtime/GrainDirectory/GrainDirectoryPartition.cs index 5f25237ef7..6179da5b96 100644 --- a/src/Orleans.Runtime/GrainDirectory/GrainDirectoryPartition.cs +++ b/src/Orleans.Runtime/GrainDirectory/GrainDirectoryPartition.cs @@ -1,11 +1,6 @@ -using System; -using System.Collections.Generic; using System.Diagnostics; using System.Diagnostics.CodeAnalysis; using System.Globalization; -using System.Linq; -using System.Threading; -using System.Threading.Tasks; using Microsoft.CodeAnalysis; using Microsoft.Extensions.Logging; using Orleans.Concurrency; @@ -34,6 +29,8 @@ internal sealed partial class GrainDirectoryPartition : SystemTarget, IGrainDire (previous, proposed) => proposed.Version >= previous.Version, _ => { }); + private readonly TimeProvider _timeProvider; + // Ranges which cannot be served currently, eg because the partition is currently transferring them from a previous owner. // Requests in these ranges must wait for the range to become available. private readonly List<(RingRange Range, MembershipVersion Version, TaskCompletionSource Completion)> _rangeLocks = []; @@ -47,11 +44,17 @@ internal sealed partial class GrainDirectoryPartition : SystemTarget, IGrainDire private RingRange _currentRange; + private readonly TimeSpan _leaseHoldDuration; + private readonly List<(RingRange Range, DateTime Expiration)> _rangeLeaseHolds = []; + private readonly Dictionary _siloLeaseHolds = []; + /// The index of this partition on this silo. Each silo hosts a fixed number of dynamically sized partitions. public GrainDirectoryPartition( int partitionIndex, DistributedGrainDirectory owner, + TimeSpan leaseHoldDuration, IInternalGrainFactory grainFactory, + TimeProvider timeProvider, SystemTargetShared shared) : base(CreateGrainId(shared.SiloAddress, partitionIndex), shared) { _partitionIndex = partitionIndex; @@ -59,6 +62,8 @@ public GrainDirectoryPartition( _grainFactory = grainFactory; _id = shared.SiloAddress; _logger = shared.LoggerFactory.CreateLogger(); + _leaseHoldDuration = leaseHoldDuration; + _timeProvider = timeProvider; shared.ActivationDirectory.RecordNewTarget(this); } @@ -238,31 +243,53 @@ await this.RunOrQueueTask(async () => } }); } - internal Task OnSiloRemovedFromClusterAsync(ClusterMember change) => + + internal Task OnSiloRemovedFromClusterAsync(ClusterMember change, SiloStatus previousStatus) => this.QueueAction( - static state => state.Self.OnSiloRemovedFromCluster(state.Change), - (Self: this, Change: change), + static state => state.Self.OnSiloRemovedFromCluster(state.Change, state.PreviousStatus), + (Self: this, Change: change, PreviousStatus: previousStatus), nameof(OnSiloRemovedFromCluster)); - private void OnSiloRemovedFromCluster(ClusterMember change) + private void OnSiloRemovedFromCluster(ClusterMember change, SiloStatus previousStatus) { GrainRuntime.CheckRuntimeContext(this); - var toRemove = new List(); - foreach (var entry in _directory) + + // We need to detect the shutdown type: + // If it was ShuttingDown, it surrendered its ownership gracefully. + // If it was Active (or Joining) and suddenly became Dead, it crashed. + + if (previousStatus is not SiloStatus.ShuttingDown && _leaseHoldDuration > TimeSpan.Zero) { - if (change.SiloAddress.Equals(entry.Value.SiloAddress)) - { - toRemove.Add(entry.Value); - } - } + // Instead of just deleting, we mark it as tombstoned. + // This prevents a new activation on a healthy silo from registering + // until we are sure the dead silo has actually stopped processing. + + var expiration = _timeProvider.GetUtcNow().UtcDateTime.Add(_leaseHoldDuration); - if (toRemove.Count > 0) + _siloLeaseHolds[change.SiloAddress] = expiration; + + LogDebugLeaseHoldForSilo(_logger, change.SiloAddress, expiration); + } + else { - LogDebugDeletingEntries(_logger, toRemove.Count, change.SiloAddress); + var toRemove = new List(); + + foreach (var entry in _directory) + { + if (change.SiloAddress.Equals(entry.Value.SiloAddress)) + { + toRemove.Add(entry.Value); + } + } - foreach (var grainAddress in toRemove) + if (toRemove.Count > 0) { - DeregisterCore(grainAddress); + LogDebugDeletingEntries(_logger, toRemove.Count, change.SiloAddress); + + foreach (var grainAddress in toRemove) + { + DeregisterCore(grainAddress); + } } } @@ -464,10 +491,21 @@ private async Task AcquireRangeAsync(DirectoryMembershipSnapshot previous, Direc var recovered = false; if (!success) { + if (_leaseHoldDuration > TimeSpan.Zero) + { + // We pessimistically asssume if snapshot transfer failed, than safety is needed. + var expiration = _timeProvider.GetUtcNow().UtcDateTime.Add(_leaseHoldDuration); + _rangeLeaseHolds.Add((addedRange, expiration)); + LogWarningLeaseHoldForRange(_logger, addedRange, expiration); + } + // Wait for previous versions to be unlocked before proceeding. await WaitForRange(addedRange, previous.Version); + // Proceed to recovery (fetching from other silos), + // but register calls will now be blocked by range lease holds. await RecoverPartitionRange(current, addedRange); + recovered = true; } @@ -750,11 +788,99 @@ async ValueTask IGrainDirectoryTestHooks.CheckIntegrityAsync() } } + internal void CleanupExpiredLeases() => this.QueueAction(static state => + state.CleanupExpiredLeasesCore(), this, nameof(CleanupExpiredLeases)); + + private void CleanupExpiredLeasesCore() + { + GrainRuntime.CheckRuntimeContext(this); + + try + { + var utcNow = _timeProvider.GetUtcNow().UtcDateTime; + + if (_rangeLeaseHolds.Count > 0) + { + var removed = _rangeLeaseHolds.RemoveAll(x => utcNow >= x.Expiration); + if (removed > 0) + { + LogDebugPrunedExpiredRangeLeaseHolds(_logger, removed); + } + } + + if (_siloLeaseHolds.Count > 0) + { + var expiredSilos = _siloLeaseHolds + .Where(kvp => utcNow >= kvp.Value) + .Select(kvp => kvp.Key) + .ToList(); + + if (expiredSilos.Count > 0) + { + // These are the grains which we were supposed to have removed when the silo was marked as dead, + // but we kept them around until we were sure the silo was actually dead. + + var removedCount = 0; + + foreach (var kvp in _directory) + { + if (expiredSilos.Contains(kvp.Value.SiloAddress!)) + { + if (_directory.Remove(kvp.Key)) + { + removedCount++; + } + } + } + + foreach (var silo in expiredSilos) + { + _siloLeaseHolds.Remove(silo); + } + + LogDebugPrunedExpiredSiloLeaseHolds(_logger, expiredSilos.Count, removedCount); + } + } + } + catch (Exception ex) + { + LogErrorLeaseCleanup(_logger, ex, _id); + } + } private sealed record class PartitionSnapshotState( MembershipVersion DirectoryMembershipVersion, List GrainAddresses, HashSet<(SiloAddress SiloAddress, int PartitionIndex)> TransferPartners); + [LoggerMessage( + Level = LogLevel.Debug, + Message = "Placed lease hold on dead silo {SiloAddress} until {Expiration}.")] + private static partial void LogDebugLeaseHoldForSilo(ILogger logger, SiloAddress siloAddress, DateTime expiration); + + [LoggerMessage( + Level = LogLevel.Debug, + Message = "Pruned {SiloCount} expired silo lease holds, removing {GrainCount} dead grain activations from the directory.")] + private static partial void LogDebugPrunedExpiredSiloLeaseHolds(ILogger logger, int siloCount, int grainCount); + + [LoggerMessage( + Level = LogLevel.Warning, + Message = "Grains in the range {Range} have been put under a lease until {Expiration}." + )] + private static partial void LogWarningLeaseHoldForRange(ILogger logger, RingRange range, DateTime expiration); + + + [LoggerMessage( + Level = LogLevel.Debug, + Message = "Pruned {Count} expired range lease holds." + )] + private static partial void LogDebugPrunedExpiredRangeLeaseHolds(ILogger logger, int count); + + [LoggerMessage( + Level = LogLevel.Error, + Message = "Error during lease hold cleanup on silo '{Silo}'." + )] + private static partial void LogErrorLeaseCleanup(ILogger logger, Exception exception, SiloAddress silo); + [LoggerMessage( Level = LogLevel.Trace, Message = "GetSnapshotAsync('{Version}', '{RangeVersion}', '{Range}')" diff --git a/test/Orleans.GrainDirectory.Tests/GrainDirectory/GrainDirectoryLeaseTests.cs b/test/Orleans.GrainDirectory.Tests/GrainDirectory/GrainDirectoryLeaseTests.cs new file mode 100644 index 0000000000..5345bf68ad --- /dev/null +++ b/test/Orleans.GrainDirectory.Tests/GrainDirectory/GrainDirectoryLeaseTests.cs @@ -0,0 +1,263 @@ +using Microsoft.Extensions.DependencyInjection; +using Microsoft.Extensions.Time.Testing; +using Orleans.Configuration; +using Orleans.Runtime.GrainDirectory; +using Orleans.Runtime.Placement; +using Orleans.TestingHost; +using Xunit; + +#nullable enable + +namespace UnitTests.GrainDirectory; + +public interface ILeaseTestGrain : IGrainWithIntegerKey +{ + Task GetAddress(); +} + +public class LeaseTestGrain : Grain, ILeaseTestGrain +{ + public Task GetAddress() => Task.FromResult(Runtime.SiloAddress); +} + +[TestCategory("Lease"), TestCategory("Directory")] +public class GrainDirectoryLeaseTests +{ + public static readonly FakeTimeProvider TimeProvider = new(DateTime.UtcNow); + // The value doesnt matter we will advance time manually. + public static readonly TimeSpan LeaseHoldDuration = TimeSpan.FromSeconds(5); + + [Fact] + public async Task BlocksReactivations_AfterUngracefulShutdown() + { + var builder = new TestClusterBuilder(2); + + builder.AddSiloBuilderConfigurator(); + + var cluster = builder.Build(); + await cluster.DeployAsync(); + + try + { + var primary = cluster.Primary; + var secondary = cluster.SecondarySilos[0]; + + RequestContext.Set(IPlacementDirector.PlacementHintKey, secondary.SiloAddress); + + var leaseGrain = cluster.GrainFactory.GetGrain(0); + Assert.Equal(await leaseGrain.GetAddress(), secondary.SiloAddress); + + await cluster.KillSiloAsync(secondary); + + // Bypass the catalog and hit the directory directly to observe lease hold behavior. + var directory = ((InProcessSiloHandle)primary).SiloHost.Services.GetRequiredService().DefaultGrainDirectory; + var fakeAddress = GrainAddress.NewActivationAddress(primary.SiloAddress, leaseGrain.GetGrainId()); + + // The registration should block while the lease hold is active. + var registerTask = directory.Register(fakeAddress); + await Task.Delay(200); + Assert.False(registerTask.IsCompleted, "Registration should be blocked by the lease hold."); + + // Advance time past the lease duration so the retry succeeds. + TimeProvider.Advance(LeaseHoldDuration); + var result = await registerTask; + Assert.NotNull(result); + + // The grain should now reactivate on the primary since it's the only silo alive. + Assert.Equal(primary.SiloAddress, await leaseGrain.GetAddress()); + } + finally + { + await cluster.StopAllSilosAsync(); + await cluster.DisposeAsync(); + } + } + + [Fact] + public async Task GracefulShutdown_DoesNotCreateLeaseHold() + { + var builder = new TestClusterBuilder(2); + builder.AddSiloBuilderConfigurator(); + + var cluster = builder.Build(); + await cluster.DeployAsync(); + + try + { + var primary = cluster.Primary; + var secondary = cluster.SecondarySilos[0]; + + RequestContext.Set(IPlacementDirector.PlacementHintKey, secondary.SiloAddress); + + var leaseGrain = cluster.GrainFactory.GetGrain(10); + Assert.Equal(secondary.SiloAddress, await leaseGrain.GetAddress()); + + // Graceful shutdown transitions through ShuttingDown → Dead, + // which does not create a silo lease hold. + await cluster.StopSiloAsync(secondary); + + var directory = ((InProcessSiloHandle)primary).SiloHost.Services + .GetRequiredService().DefaultGrainDirectory; + var fakeAddress = GrainAddress.NewActivationAddress(primary.SiloAddress, leaseGrain.GetGrainId()); + + // Should succeed immediately — no lease hold for graceful shutdown. + var result = await directory.Register(fakeAddress); + Assert.NotNull(result); + Assert.Equal(primary.SiloAddress, result.SiloAddress); + } + finally + { + await cluster.StopAllSilosAsync(); + await cluster.DisposeAsync(); + } + } + + [Fact] + public async Task DisabledLeaseHold_AllowsImmediateReregistration() + { + var builder = new TestClusterBuilder(2); + builder.AddSiloBuilderConfigurator(); + + var cluster = builder.Build(); + await cluster.DeployAsync(); + + try + { + var primary = cluster.Primary; + var secondary = cluster.SecondarySilos[0]; + + RequestContext.Set(IPlacementDirector.PlacementHintKey, secondary.SiloAddress); + + var leaseGrain = cluster.GrainFactory.GetGrain(20); + Assert.Equal(secondary.SiloAddress, await leaseGrain.GetAddress()); + + // Ungraceful kill, but leases are disabled (duration = Zero). + await cluster.KillSiloAsync(secondary); + + var directory = ((InProcessSiloHandle)primary).SiloHost.Services + .GetRequiredService().DefaultGrainDirectory; + var fakeAddress = GrainAddress.NewActivationAddress(primary.SiloAddress, leaseGrain.GetGrainId()); + + // Should succeed immediately — lease holds are disabled. + var result = await directory.Register(fakeAddress); + Assert.NotNull(result); + Assert.Equal(primary.SiloAddress, result.SiloAddress); + } + finally + { + await cluster.StopAllSilosAsync(); + await cluster.DisposeAsync(); + } + } + + [Fact] + public async Task LookupReturnsNull_DuringActiveLeaseHold() + { + var builder = new TestClusterBuilder(2); + builder.AddSiloBuilderConfigurator(); + + var cluster = builder.Build(); + await cluster.DeployAsync(); + + try + { + var primary = cluster.Primary; + var secondary = cluster.SecondarySilos[0]; + + RequestContext.Set(IPlacementDirector.PlacementHintKey, secondary.SiloAddress); + + var leaseGrain = cluster.GrainFactory.GetGrain(30); + Assert.Equal(secondary.SiloAddress, await leaseGrain.GetAddress()); + + await cluster.KillSiloAsync(secondary); + + var directory = ((InProcessSiloHandle)primary).SiloHost.Services + .GetRequiredService().DefaultGrainDirectory; + + // Lookup should return null: the entry is retained for the lease hold, + // but the silo is dead so the directory filters it out. + var result = await directory.Lookup(leaseGrain.GetGrainId()); + Assert.Null(result); + } + finally + { + await cluster.StopAllSilosAsync(); + await cluster.DisposeAsync(); + } + } + + [Fact] + public async Task BlocksMultipleGrains_AfterUngracefulShutdown() + { + var builder = new TestClusterBuilder(2); + builder.AddSiloBuilderConfigurator(); + + var cluster = builder.Build(); + await cluster.DeployAsync(); + + try + { + var primary = cluster.Primary; + var secondary = cluster.SecondarySilos[0]; + + // Place multiple grains on the secondary silo. + RequestContext.Set(IPlacementDirector.PlacementHintKey, secondary.SiloAddress); + var grain1 = cluster.GrainFactory.GetGrain(41); + var grain2 = cluster.GrainFactory.GetGrain(42); + var grain3 = cluster.GrainFactory.GetGrain(43); + Assert.Equal(secondary.SiloAddress, await grain1.GetAddress()); + Assert.Equal(secondary.SiloAddress, await grain2.GetAddress()); + Assert.Equal(secondary.SiloAddress, await grain3.GetAddress()); + + await cluster.KillSiloAsync(secondary); + + var directory = ((InProcessSiloHandle)primary).SiloHost.Services + .GetRequiredService().DefaultGrainDirectory; + + // All grains on the dead silo should be blocked by the lease hold. + var task1 = directory.Register(GrainAddress.NewActivationAddress(primary.SiloAddress, grain1.GetGrainId())); + var task2 = directory.Register(GrainAddress.NewActivationAddress(primary.SiloAddress, grain2.GetGrainId())); + var task3 = directory.Register(GrainAddress.NewActivationAddress(primary.SiloAddress, grain3.GetGrainId())); + await Task.Delay(200); + Assert.False(task1.IsCompleted, "Registration for grain1 should be blocked by the lease hold."); + Assert.False(task2.IsCompleted, "Registration for grain2 should be blocked by the lease hold."); + Assert.False(task3.IsCompleted, "Registration for grain3 should be blocked by the lease hold."); + + // After the lease expires, all registrations should complete. + TimeProvider.Advance(LeaseHoldDuration); + await Task.WhenAll(task1, task2, task3); + + Assert.Equal(primary.SiloAddress, await grain1.GetAddress()); + Assert.Equal(primary.SiloAddress, await grain2.GetAddress()); + Assert.Equal(primary.SiloAddress, await grain3.GetAddress()); + } + finally + { + await cluster.StopAllSilosAsync(); + await cluster.DisposeAsync(); + } + } + + private sealed class SiloBuilderConfigurator : ISiloConfigurator + { + public void Configure(ISiloBuilder siloBuilder) + { + siloBuilder.ConfigureServices(sp => sp.AddSingleton(TimeProvider)); + siloBuilder.Configure(o => o.SafetyLeaseHoldDuration = LeaseHoldDuration); +#pragma warning disable ORLEANSEXP003 + siloBuilder.AddDistributedGrainDirectory(); +#pragma warning restore ORLEANSEXP003 + } + } + + private sealed class DisabledLeaseSiloConfigurator : ISiloConfigurator + { + public void Configure(ISiloBuilder siloBuilder) + { + siloBuilder.Configure(o => o.SafetyLeaseHoldDuration = TimeSpan.Zero); +#pragma warning disable ORLEANSEXP003 + siloBuilder.AddDistributedGrainDirectory(); +#pragma warning restore ORLEANSEXP003 + } + } +} diff --git a/test/Orleans.GrainDirectory.Tests/Orleans.GrainDirectory.Tests.csproj b/test/Orleans.GrainDirectory.Tests/Orleans.GrainDirectory.Tests.csproj index 9625143c35..9d5f9182ba 100644 --- a/test/Orleans.GrainDirectory.Tests/Orleans.GrainDirectory.Tests.csproj +++ b/test/Orleans.GrainDirectory.Tests/Orleans.GrainDirectory.Tests.csproj @@ -7,6 +7,7 @@ +