Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 0 additions & 2 deletions playground/ActivationSheddingToy/Program.cs
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,7 @@
builder.UseOrleans(orleans =>
{
orleans.UseLocalhostClustering();
#pragma warning disable ORLEANSEXP003 // Type is for evaluation purposes only and is subject to change or removal in future updates. Suppress this diagnostic to proceed.
orleans.AddDistributedGrainDirectory();
#pragma warning restore ORLEANSEXP003 // Type is for evaluation purposes only and is subject to change or removal in future updates. Suppress this diagnostic to proceed.
});

builder.Services.Configure<GrainCollectionOptions>(options =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,14 +3,12 @@
namespace ChaoticCluster.Silo;

class SiloBuilderConfigurator : ISiloConfigurator
{
public void Configure(ISiloBuilder siloBuilder)
{
public void Configure(ISiloBuilder siloBuilder)
{
#pragma warning disable ORLEANSEXP003 // Type is for evaluation purposes only and is subject to change or removal in future updates. Suppress this diagnostic to proceed.
siloBuilder.AddDistributedGrainDirectory();
#pragma warning restore ORLEANSEXP003 // Type is for evaluation purposes only and is subject to change or removal in future updates. Suppress this diagnostic to proceed.
}
siloBuilder.AddDistributedGrainDirectory();
}
}

internal interface IMyTestGrain : IGrainWithIntegerKey
{
Expand Down
72 changes: 70 additions & 2 deletions src/Orleans.Core/Configuration/ServiceCollectionExtensions.cs
Original file line number Diff line number Diff line change
@@ -1,9 +1,76 @@
using System;
using System.Collections.Generic;
using System.Linq;
using Microsoft.Extensions.DependencyInjection;

namespace Orleans.Configuration.Internal
{
/// <summary>
/// A <see cref="ServiceDescriptor"/> subclass that tracks the underlying implementation type
/// used when registering a service via <see cref="ServiceCollectionExtensions.AddFromExisting"/>.
/// This allows service registrations to be identified and removed later based on their implementation type.
/// </summary>
internal sealed class TaggedServiceDescriptor : ServiceDescriptor
{
/// <summary>
/// Initializes a new instance of the <see cref="TaggedServiceDescriptor"/> class.
/// </summary>
/// <param name="serviceType">The type of the service.</param>
/// <param name="factory">The factory used for creating service instances.</param>
/// <param name="lifetime">The lifetime of the service.</param>
/// <param name="implementationType">The underlying implementation type this registration was created from.</param>
public TaggedServiceDescriptor(
Type serviceType,
Func<IServiceProvider, object> factory,
ServiceLifetime lifetime,
Type implementationType)
: base(serviceType, factory, lifetime)
{
SourceImplementationType = implementationType;
}

/// <summary>
/// Gets the underlying implementation type that this service registration was created from.
/// </summary>
public Type SourceImplementationType { get; }

/// <summary>
/// Removes all service descriptors from the collection that were registered from the specified implementation type.
/// </summary>
/// <typeparam name="TImplementation">The implementation type to remove registrations for.</typeparam>
/// <param name="services">The service collection to remove from.</param>
public static void RemoveAllForImplementation<TImplementation>(IServiceCollection services)
{
RemoveAllForImplementation(services, typeof(TImplementation));
}

/// <summary>
/// Removes all service descriptors from the collection that were registered from the specified implementation type.
/// </summary>
/// <param name="services">The service collection to remove from.</param>
/// <param name="implementationType">The implementation type to remove registrations for.</param>
public static void RemoveAllForImplementation(IServiceCollection services, Type implementationType)
{
var toRemove = new List<ServiceDescriptor>();
foreach (var descriptor in services)
{
if (descriptor is TaggedServiceDescriptor tagged && tagged.SourceImplementationType == implementationType)
{
toRemove.Add(descriptor);
}
else if (descriptor.ServiceType == implementationType || descriptor.ImplementationType == implementationType)
{
toRemove.Add(descriptor);
}
}

foreach (var descriptor in toRemove)
{
services.Remove(descriptor);
}
}
}

/// <summary>
/// Extension methods for configuring dependency injection.
/// </summary>
Expand Down Expand Up @@ -43,10 +110,11 @@ public static void AddFromExisting(this IServiceCollection services, Type servic
throw new ArgumentNullException(nameof(implementation), $"Unable to find previously registered ServiceType of '{implementation.FullName}'");
}

var newRegistration = new ServiceDescriptor(
var newRegistration = new TaggedServiceDescriptor(
service,
sp => sp.GetRequiredService(implementation),
registration.Lifetime);
registration.Lifetime,
implementation);
services.Add(newRegistration);
}

Expand Down
2 changes: 0 additions & 2 deletions src/Orleans.Runtime/Core/InternalGrainRuntime.cs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@ internal class InternalGrainRuntime(
GrainLocator grainLocator,
CompatibilityDirectorManager compatibilityDirectorManager,
IOptions<GrainCollectionOptions> collectionOptions,
ILocalGrainDirectory localGrainDirectory,
IActivationWorkingSet activationWorkingSet)
{
public InsideRuntimeClient RuntimeClient { get; } = catalog.RuntimeClient;
Expand All @@ -29,7 +28,6 @@ internal class InternalGrainRuntime(
public CompatibilityDirectorManager CompatibilityDirectorManager { get; } = compatibilityDirectorManager;
public GrainLocator GrainLocator { get; } = grainLocator;
public IOptions<GrainCollectionOptions> CollectionOptions { get; } = collectionOptions;
public ILocalGrainDirectory LocalGrainDirectory { get; } = localGrainDirectory;
public IActivationWorkingSet ActivationWorkingSet { get; } = activationWorkingSet;
}
}
2 changes: 1 addition & 1 deletion src/Orleans.Runtime/GrainDirectory/CachedGrainLocator.cs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ namespace Orleans.Runtime.GrainDirectory
/// <summary>
/// Implementation of <see cref="IGrainLocator"/> that uses <see cref="IGrainDirectory"/> stores.
/// </summary>
internal class CachedGrainLocator : IGrainLocator, ILifecycleParticipant<ISiloLifecycle>, CachedGrainLocator.ITestAccessor
internal sealed class CachedGrainLocator : IGrainLocator, ILifecycleParticipant<ISiloLifecycle>, CachedGrainLocator.ITestAccessor
{
private readonly GrainDirectoryResolver grainDirectoryResolver;
private readonly IGrainDirectoryCache cache;
Expand Down
206 changes: 206 additions & 0 deletions src/Orleans.Runtime/GrainDirectory/DelegatingRemoteGrainDirectory.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,206 @@
using System.Collections.Generic;
using System.Threading;
using System.Threading.Tasks;
using Microsoft.Extensions.Logging;
using Orleans.GrainDirectory;

#nullable enable
namespace Orleans.Runtime.GrainDirectory;

/// <summary>
/// An <see cref="IRemoteGrainDirectory"/> implementation that delegates to <see cref="DistributedGrainDirectory"/>.
/// </summary>
/// <remarks>
/// <para>
/// This system target enables rolling upgrades from the legacy <see cref="LocalGrainDirectory"/> to the new
/// <see cref="DistributedGrainDirectory"/>. When an old silo using <see cref="LocalGrainDirectory"/> sends
/// a directory request to a new silo using <see cref="DistributedGrainDirectory"/>, the request is received
/// by this system target and forwarded to the <see cref="DistributedGrainDirectory"/>.
/// </para>
/// <para>
/// Unlike the previous <c>DelegatingGrainDirectoryPartition</c> approach, this implementation is fully async
/// and does not require blocking IO, since <see cref="IRemoteGrainDirectory"/> is an async interface.
/// </para>
/// <para>
/// This class is registered as an <see cref="ILifecycleParticipant{ISiloLifecycle}"/> and registers itself
/// in the activation directory during the <see cref="ServiceLifecycleStage.RuntimeInitialize"/> stage.
/// </para>
/// </remarks>
internal partial class DelegatingRemoteGrainDirectory : SystemTarget, IRemoteGrainDirectory, ILifecycleParticipant<ISiloLifecycle>
{
private readonly DistributedGrainDirectory _directory;
private readonly SystemTargetShared _shared;
private readonly ILogger _logger;

public DelegatingRemoteGrainDirectory(
DistributedGrainDirectory directory,
GrainType grainType,
SystemTargetShared shared) : base(grainType, shared)
{
_directory = directory;
_shared = shared;
_logger = shared.LoggerFactory.CreateLogger<DelegatingRemoteGrainDirectory>();
}

void ILifecycleParticipant<ISiloLifecycle>.Participate(ISiloLifecycle lifecycle)
{
lifecycle.Subscribe(
nameof(DelegatingRemoteGrainDirectory),
ServiceLifecycleStage.RuntimeInitialize,
OnRuntimeInitialize);
}

private Task OnRuntimeInitialize(CancellationToken cancellationToken)
{
// Register this system target in the activation directory so it can receive messages.
_shared.ActivationDirectory.RecordNewTarget(this);
LogDebugRegistered(GrainId);
return Task.CompletedTask;
}

public async Task<AddressAndTag> RegisterAsync(GrainAddress address, int hopCount = 0)
{
LogRegisterAsync(address, hopCount);
var result = await _directory.Register(address);
return ToAddressAndTag(result);
}

public async Task<AddressAndTag> RegisterAsync(GrainAddress address, GrainAddress? previousAddress, int hopCount = 0)
{
LogRegisterAsyncWithPrevious(address, previousAddress, hopCount);
var result = await _directory.Register(address, previousAddress);
return ToAddressAndTag(result);
}

public async Task<AddressAndTag> LookupAsync(GrainId grainId, int hopCount = 0)
{
LogLookupAsync(grainId, hopCount);
var result = await _directory.Lookup(grainId);
return ToAddressAndTag(result);
}

public async Task UnregisterAsync(GrainAddress address, UnregistrationCause cause, int hopCount = 0)
{
LogUnregisterAsync(address, cause, hopCount);
await _directory.Unregister(address);
}

public async Task UnregisterManyAsync(List<GrainAddress> addresses, UnregistrationCause cause, int hopCount = 0)
{
LogUnregisterManyAsync(addresses.Count, cause, hopCount);
foreach (var address in addresses)
{
await _directory.Unregister(address);
}
}

public async Task DeleteGrainAsync(GrainId grainId, int hopCount = 0)
{
LogDeleteGrainAsync(grainId, hopCount);
// Look up the grain first to get the full address, then unregister it
var address = await _directory.Lookup(grainId);
if (address is not null)
{
await _directory.Unregister(address);
}
}

public async Task RegisterMany(List<GrainAddress> addresses)
{
LogRegisterMany(addresses.Count);
foreach (var address in addresses)
{
await _directory.Register(address);
}
}

public async Task<List<AddressAndTag>> LookUpMany(List<(GrainId GrainId, int Version)> grainAndETagList)
{
LogLookUpMany(grainAndETagList.Count);
var result = new List<AddressAndTag>(grainAndETagList.Count);
foreach (var (grainId, version) in grainAndETagList)
{
var address = await _directory.Lookup(grainId);
var tag = address?.GetHashCode() ?? GrainInfo.NO_ETAG;

// If the version matches, return empty address (no update needed)
if (tag == version)
{
result.Add(new AddressAndTag(GrainAddress.GetAddress(null, grainId, default), tag));
}
else
{
result.Add(new AddressAndTag(address, tag));
}
}
return result;
}

public async Task AcceptSplitPartition(List<GrainAddress> singleActivations)
{
// During rolling upgrade, an old silo may try to hand off partition data to this silo.
// We accept these registrations by registering them in the DistributedGrainDirectory.
LogAcceptSplitPartition(singleActivations.Count);
foreach (var address in singleActivations)
{
await _directory.Register(address);
}
}

private static AddressAndTag ToAddressAndTag(GrainAddress? address)
{
return new AddressAndTag(address, address?.GetHashCode() ?? 0);
}

[LoggerMessage(Level = LogLevel.Debug, Message = "Registered DelegatingRemoteGrainDirectory system target: {GrainId}")]
private partial void LogDebugRegistered(GrainId grainId);

[LoggerMessage(Level = LogLevel.Trace, Message = "RegisterAsync: address={Address}, hopCount={HopCount}")]
private partial void LogRegisterAsync(GrainAddress address, int hopCount);

[LoggerMessage(Level = LogLevel.Trace, Message = "RegisterAsync: address={Address}, previousAddress={PreviousAddress}, hopCount={HopCount}")]
private partial void LogRegisterAsyncWithPrevious(GrainAddress address, GrainAddress? previousAddress, int hopCount);

[LoggerMessage(Level = LogLevel.Trace, Message = "LookupAsync: grainId={GrainId}, hopCount={HopCount}")]
private partial void LogLookupAsync(GrainId grainId, int hopCount);

[LoggerMessage(Level = LogLevel.Trace, Message = "UnregisterAsync: address={Address}, cause={Cause}, hopCount={HopCount}")]
private partial void LogUnregisterAsync(GrainAddress address, UnregistrationCause cause, int hopCount);

[LoggerMessage(Level = LogLevel.Trace, Message = "UnregisterManyAsync: count={Count}, cause={Cause}, hopCount={HopCount}")]
private partial void LogUnregisterManyAsync(int count, UnregistrationCause cause, int hopCount);

[LoggerMessage(Level = LogLevel.Trace, Message = "DeleteGrainAsync: grainId={GrainId}, hopCount={HopCount}")]
private partial void LogDeleteGrainAsync(GrainId grainId, int hopCount);

[LoggerMessage(Level = LogLevel.Trace, Message = "RegisterMany: count={Count}")]
private partial void LogRegisterMany(int count);

[LoggerMessage(Level = LogLevel.Trace, Message = "LookUpMany: count={Count}")]
private partial void LogLookUpMany(int count);

[LoggerMessage(Level = LogLevel.Debug, Message = "AcceptSplitPartition: accepting {Count} entries from old silo during rolling upgrade")]
private partial void LogAcceptSplitPartition(int count);
}

/// <summary>
/// The <see cref="IRemoteGrainDirectory"/> system target that handles directory service requests from old silos.
/// </summary>
internal sealed class DelegatingDirectoryService : DelegatingRemoteGrainDirectory
{
public DelegatingDirectoryService(DistributedGrainDirectory directory, SystemTargetShared shared)
: base(directory, Constants.DirectoryServiceType, shared)
{
}
}

/// <summary>
/// The <see cref="IRemoteGrainDirectory"/> system target that handles cache validation requests from old silos.
/// </summary>
internal sealed class DelegatingCacheValidator : DelegatingRemoteGrainDirectory
{
public DelegatingCacheValidator(DistributedGrainDirectory directory, SystemTargetShared shared)
: base(directory, Constants.DirectoryCacheValidatorType, shared)
{
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@ internal sealed partial class DistributedGrainDirectory : SystemTarget, IGrainDi
private readonly IServiceProvider _serviceProvider;
private readonly ImmutableArray<GrainDirectoryPartition> _partitions;
private readonly CancellationTokenSource _stoppedCts = new();
private readonly ActivationDirectory _localActivations;

internal CancellationToken OnStoppedToken => _stoppedCts.Token;
internal ClusterMembershipSnapshot ClusterMembershipSnapshot => _membershipService.CurrentView.ClusterMembershipSnapshot;
Expand All @@ -77,7 +78,6 @@ internal sealed partial class DistributedGrainDirectory : SystemTarget, IGrainDi
// for each recovery version.
private long _recoveryMembershipVersion;
private Task _runTask = Task.CompletedTask;
private ActivationDirectory _localActivations;
private GrainDirectoryResolver? _grainDirectoryResolver;

public DistributedGrainDirectory(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,22 +21,19 @@ internal sealed partial class GrainDirectoryHandoffManager
private readonly ISiloStatusOracle siloStatusOracle;
private readonly IInternalGrainFactory grainFactory;
private readonly ILogger logger;
private readonly Factory<LocalGrainDirectoryPartition> createPartion;
private readonly Queue<(string name, object state, Func<GrainDirectoryHandoffManager, object, Task> action)> pendingOperations = new();
private readonly AsyncLock executorLock = new AsyncLock();

internal GrainDirectoryHandoffManager(
LocalGrainDirectory localDirectory,
ISiloStatusOracle siloStatusOracle,
IInternalGrainFactory grainFactory,
Factory<LocalGrainDirectoryPartition> createPartion,
ILoggerFactory loggerFactory)
{
logger = loggerFactory.CreateLogger<GrainDirectoryHandoffManager>();
this.localDirectory = localDirectory;
this.siloStatusOracle = siloStatusOracle;
this.grainFactory = grainFactory;
this.createPartion = createPartion;
}

internal void ProcessSiloAddEvent(SiloAddress addedSilo)
Expand Down
Loading
Loading