Skip to content

Commit 5d23709

Browse files
committed
Clean up
1 parent ab2801c commit 5d23709

File tree

4 files changed

+86
-123
lines changed

4 files changed

+86
-123
lines changed

src/Orleans.Runtime/Catalog/Catalog.cs

Lines changed: 0 additions & 102 deletions
Original file line numberDiff line numberDiff line change
@@ -12,9 +12,7 @@ namespace Orleans.Runtime
1212
{
1313
internal sealed partial class Catalog : SystemTarget, ICatalog, ILifecycleParticipant<ISiloLifecycle>, ISiloStatusListener
1414
{
15-
private readonly SiloAddress _siloAddress;
1615
private readonly ActivationCollector activationCollector;
17-
private readonly GrainDirectoryResolver grainDirectoryResolver;
1816
private readonly ActivationDirectory activations;
1917
private readonly IServiceProvider serviceProvider;
2018
private readonly ILogger logger;
@@ -27,8 +25,6 @@ internal sealed partial class Catalog : SystemTarget, ICatalog, ILifecyclePartic
2725
private readonly object[] _locks = new object[LockCount];
2826

2927
public Catalog(
30-
ILocalSiloDetails localSiloDetails,
31-
GrainDirectoryResolver grainDirectoryResolver,
3228
ActivationDirectory activationDirectory,
3329
ActivationCollector activationCollector,
3430
IServiceProvider serviceProvider,
@@ -37,8 +33,6 @@ public Catalog(
3733
SystemTargetShared shared)
3834
: base(Constants.CatalogType, shared)
3935
{
40-
this._siloAddress = localSiloDetails.SiloAddress;
41-
this.grainDirectoryResolver = grainDirectoryResolver;
4236
this.activations = activationDirectory;
4337
this.serviceProvider = serviceProvider;
4438
this.grainActivator = grainActivator;
@@ -98,19 +92,6 @@ public void UnregisterMessageTarget(IGrainContext activation)
9892
}
9993
}
10094

101-
/// <summary>
102-
/// FOR TESTING PURPOSES ONLY!!
103-
/// </summary>
104-
/// <param name="grain"></param>
105-
internal int UnregisterGrainForTesting(GrainId grain)
106-
{
107-
var activation = activations.FindTarget(grain);
108-
if (activation is null) return 0;
109-
110-
UnregisterMessageTarget(activation);
111-
return 1;
112-
}
113-
11495
/// <summary>
11596
/// If activation already exists, return it.
11697
/// Otherwise, creates a new activation, begins rehydrating it and activating it, then returns it.
@@ -310,96 +291,13 @@ void ISiloStatusListener.SiloStatusChangeNotification(SiloAddress updatedSilo, S
310291
}
311292
}
312293

313-
// TODO move this logic in the LocalGrainDirectory
314-
internal void OnSiloStatusChange(ILocalGrainDirectory directory, SiloAddress updatedSilo, SiloStatus status)
315-
{
316-
// ignore joining events and also events on myself.
317-
if (updatedSilo.Equals(_siloAddress)) return;
318-
319-
// We deactivate those activations when silo goes either of ShuttingDown/Stopping/Dead states,
320-
// since this is what Directory is doing as well. Directory removes a silo based on all those 3 statuses,
321-
// thus it will only deliver a "remove" notification for a given silo once to us. Therefore, we need to react the fist time we are notified.
322-
// We may review the directory behavior in the future and treat ShuttingDown differently ("drain only") and then this code will have to change a well.
323-
if (!status.IsTerminating()) return;
324-
325-
var activationsToShutdown = new List<IGrainContext>();
326-
try
327-
{
328-
// scan all activations in activation directory and deactivate the ones that the removed silo is their primary partition owner.
329-
// Note: No lock needed here since ActivationDirectory uses ConcurrentDictionary which provides thread-safe enumeration
330-
foreach (var activation in activations)
331-
{
332-
try
333-
{
334-
var activationData = activation.Value;
335-
var placementStrategy = activationData.GetComponent<PlacementStrategy>();
336-
var isUsingGrainDirectory = placementStrategy is { IsUsingGrainDirectory: true };
337-
if (!isUsingGrainDirectory || !grainDirectoryResolver.IsUsingDhtGrainDirectory(activationData.GrainId.Type)) continue;
338-
if (!updatedSilo.Equals(directory.GetPrimaryForGrain(activationData.GrainId))) continue;
339-
340-
activationsToShutdown.Add(activationData);
341-
}
342-
catch (Exception exc)
343-
{
344-
LogErrorCatalogSiloStatusChangeNotification(new(updatedSilo), exc);
345-
}
346-
}
347-
348-
if (activationsToShutdown.Count > 0)
349-
{
350-
LogInfoCatalogSiloStatusChangeNotification(activationsToShutdown.Count, new(updatedSilo));
351-
}
352-
}
353-
finally
354-
{
355-
// outside the lock.
356-
if (activationsToShutdown.Count > 0)
357-
{
358-
var reasonText = $"This activation is being deactivated due to a failure of server {updatedSilo}, since it was responsible for this activation's grain directory registration.";
359-
var reason = new DeactivationReason(DeactivationReasonCode.DirectoryFailure, reasonText);
360-
StartDeactivatingActivations(reason, activationsToShutdown, CancellationToken.None);
361-
}
362-
}
363-
364-
void StartDeactivatingActivations(DeactivationReason reason, List<IGrainContext> list, CancellationToken cancellationToken)
365-
{
366-
if (list == null || list.Count == 0) return;
367-
368-
LogDebugDeactivateActivations(list.Count);
369-
370-
foreach (var activation in list)
371-
{
372-
activation.Deactivate(reason, cancellationToken);
373-
}
374-
}
375-
}
376-
377294
void ILifecycleParticipant<ISiloLifecycle>.Participate(ISiloLifecycle lifecycle)
378295
{
379296
// Do nothing, just ensure that this instance is created so that it can register itself in the activation directory.
380297
_siloStatusOracle = serviceProvider.GetRequiredService<ISiloStatusOracle>();
381298
_siloStatusOracle.SubscribeToSiloStatusEvents(this);
382299
}
383300

384-
private readonly struct SiloAddressLogValue(SiloAddress silo)
385-
{
386-
public override string ToString() => silo.ToStringWithHashCode();
387-
}
388-
389-
[LoggerMessage(
390-
Level = LogLevel.Error,
391-
EventId = (int)ErrorCode.Catalog_SiloStatusChangeNotification_Exception,
392-
Message = "Catalog has thrown an exception while handling removal of silo {Silo}"
393-
)]
394-
private partial void LogErrorCatalogSiloStatusChangeNotification(SiloAddressLogValue silo, Exception exception);
395-
396-
[LoggerMessage(
397-
Level = LogLevel.Information,
398-
EventId = (int)ErrorCode.Catalog_SiloStatusChangeNotification,
399-
Message = "Catalog is deactivating {Count} activations due to a failure of silo {Silo}, since it is a primary directory partition to these grain ids."
400-
)]
401-
private partial void LogInfoCatalogSiloStatusChangeNotification(int count, SiloAddressLogValue silo);
402-
403301
[LoggerMessage(
404302
Level = LogLevel.Trace,
405303
Message = "Unregistered activation {Activation}")]

src/Orleans.Runtime/GrainDirectory/LocalGrainDirectory.cs

Lines changed: 86 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -3,8 +3,8 @@
33
using System.Collections.Immutable;
44
using System.ComponentModel;
55
using System.Diagnostics.CodeAnalysis;
6+
using System.Threading;
67
using System.Threading.Tasks;
7-
using Microsoft.Extensions.DependencyInjection;
88
using Microsoft.Extensions.Logging;
99
using Microsoft.Extensions.Options;
1010
using Orleans.Configuration;
@@ -20,15 +20,15 @@ internal sealed partial class LocalGrainDirectory : ILocalGrainDirectory, ISiloS
2020
private readonly SiloAddress? seed;
2121
private readonly ISiloStatusOracle siloStatusOracle;
2222
private readonly IInternalGrainFactory grainFactory;
23+
private readonly ActivationDirectory activations;
24+
private readonly GrainDirectoryResolver grainDirectoryResolver;
2325
private readonly object writeLock = new object();
24-
private readonly IServiceProvider _serviceProvider;
2526
private DirectoryMembership directoryMembership = DirectoryMembership.Default;
2627

2728
// Consider: move these constants into an appropriate place
2829
internal const int HOP_LIMIT = 6; // forward a remote request no more than 5 times
2930
public static readonly TimeSpan RETRY_DELAY = TimeSpan.FromMilliseconds(200); // Pause 200ms between forwards to let the membership directory settle down
3031
internal bool Running;
31-
private Catalog? _catalog;
3232

3333
internal SiloAddress MyAddress { get; }
3434

@@ -45,6 +45,8 @@ public LocalGrainDirectory(
4545
ILocalSiloDetails siloDetails,
4646
ISiloStatusOracle siloStatusOracle,
4747
IInternalGrainFactory grainFactory,
48+
ActivationDirectory activationDirectory,
49+
GrainDirectoryResolver grainDirectoryResolver,
4850
IOptions<DevelopmentClusterMembershipOptions> developmentClusterMembershipOptions,
4951
IOptions<GrainDirectoryOptions> grainDirectoryOptions,
5052
ILoggerFactory loggerFactory,
@@ -57,6 +59,8 @@ public LocalGrainDirectory(
5759

5860
this.siloStatusOracle = siloStatusOracle;
5961
this.grainFactory = grainFactory;
62+
this.activations = activationDirectory;
63+
this.grainDirectoryResolver = grainDirectoryResolver;
6064

6165
DirectoryCache = GrainDirectoryCacheFactory.CreateGrainDirectoryCache(serviceProvider, grainDirectoryOptions.Value);
6266

@@ -84,7 +88,6 @@ public LocalGrainDirectory(
8488
return ring.Count == 0 ? 0 : ((float)100 / (float)ring.Count);
8589
});
8690
DirectoryInstruments.RegisterRingSizeObserve(() => this.directoryMembership.MembershipRingList.Count);
87-
_serviceProvider = serviceProvider;
8891
}
8992

9093
public void Start()
@@ -151,26 +154,21 @@ private void AddServer(SiloAddress silo)
151154

152155
private void RemoveServer(SiloAddress silo, SiloStatus status)
153156
{
157+
List<IGrainContext>? activationsToShutdown = null;
158+
154159
lock (this.writeLock)
155160
{
156-
try
157-
{
158-
// Only notify the catalog once. Order is important: call BEFORE updating membershipRingList.
159-
_catalog = _serviceProvider.GetRequiredService<Catalog>();
160-
_catalog.OnSiloStatusChange(this, silo, status);
161-
}
162-
catch (Exception exc)
163-
{
164-
LogErrorCatalogSiloStatusChangeNotificationException(exc, new(silo));
165-
}
166-
167161
var existing = this.directoryMembership;
168162
if (!existing.MembershipCache.Contains(silo))
169163
{
170164
// we have already removed this silo
171165
return;
172166
}
173167

168+
// Collect activations to deactivate BEFORE updating membershipRingList,
169+
// since GetPrimaryForGrain depends on the current ring membership.
170+
activationsToShutdown = CollectActivationsToDeactivate(silo);
171+
174172
this.directoryMembership = new DirectoryMembership(
175173
existing.MembershipRingList.Remove(silo),
176174
existing.MembershipCache.Remove(silo));
@@ -180,6 +178,64 @@ private void RemoveServer(SiloAddress silo, SiloStatus status)
180178

181179
LogDebugSiloRemovedSilo(MyAddress, silo);
182180
}
181+
182+
// Deactivate activations outside the lock
183+
if (activationsToShutdown is { Count: > 0 })
184+
{
185+
var reasonText = $"This activation is being deactivated due to a failure of server {silo}, since it was responsible for this activation's grain directory registration.";
186+
var reason = new DeactivationReason(DeactivationReasonCode.DirectoryFailure, reasonText);
187+
DeactivateActivations(reason, activationsToShutdown);
188+
}
189+
}
190+
191+
/// <summary>
192+
/// Collects activations that should be deactivated because the removed silo was their primary directory partition owner.
193+
/// </summary>
194+
private List<IGrainContext> CollectActivationsToDeactivate(SiloAddress removedSilo)
195+
{
196+
var result = new List<IGrainContext>();
197+
try
198+
{
199+
// Scan all activations and find those that the removed silo is their primary partition owner.
200+
foreach (var activation in activations)
201+
{
202+
try
203+
{
204+
var activationData = activation.Value;
205+
var placementStrategy = activationData.GetComponent<PlacementStrategy>();
206+
var isUsingGrainDirectory = placementStrategy is { IsUsingGrainDirectory: true };
207+
if (!isUsingGrainDirectory || !grainDirectoryResolver.IsUsingDhtGrainDirectory(activationData.GrainId.Type)) continue;
208+
if (!removedSilo.Equals(GetPrimaryForGrain(activationData.GrainId))) continue;
209+
210+
result.Add(activationData);
211+
}
212+
catch (Exception exc)
213+
{
214+
LogErrorCollectActivationsToDeactivate(new(removedSilo), exc);
215+
}
216+
}
217+
218+
if (result.Count > 0)
219+
{
220+
LogInfoDeactivatingActivationsDueToSiloFailure(result.Count, new(removedSilo));
221+
}
222+
}
223+
catch (Exception exc)
224+
{
225+
LogErrorCollectActivationsToDeactivate(new(removedSilo), exc);
226+
}
227+
228+
return result;
229+
}
230+
231+
private void DeactivateActivations(DeactivationReason reason, List<IGrainContext> activations)
232+
{
233+
LogDebugDeactivateActivations(activations.Count);
234+
235+
foreach (var activation in activations)
236+
{
237+
activation.Deactivate(reason, CancellationToken.None);
238+
}
183239
}
184240

185241
/// <summary>
@@ -826,9 +882,22 @@ private readonly struct SiloHashLogValue(SiloAddress? silo)
826882
[LoggerMessage(
827883
EventId = (int)ErrorCode.Directory_SiloStatusChangeNotification_Exception,
828884
Level = LogLevel.Error,
829-
Message = "CatalogSiloStatusListener.SiloStatusChangeNotification has thrown an exception when notified about removed silo {Silo}."
885+
Message = "Exception while collecting activations to deactivate after removal of silo {Silo}."
886+
)]
887+
private partial void LogErrorCollectActivationsToDeactivate(SiloAddressLogValue silo, Exception exception);
888+
889+
[LoggerMessage(
890+
EventId = (int)ErrorCode.Catalog_SiloStatusChangeNotification,
891+
Level = LogLevel.Information,
892+
Message = "Deactivating {Count} activations due to failure of silo {Silo}, since it was the primary directory partition for these grain ids."
893+
)]
894+
private partial void LogInfoDeactivatingActivationsDueToSiloFailure(int count, SiloAddressLogValue silo);
895+
896+
[LoggerMessage(
897+
Level = LogLevel.Debug,
898+
Message = "DeactivateActivations: {Count} activations."
830899
)]
831-
private partial void LogErrorCatalogSiloStatusChangeNotificationException(Exception exception, SiloAddressLogValue silo);
900+
private partial void LogDebugDeactivateActivations(int count);
832901

833902
[LoggerMessage(
834903
Level = LogLevel.Debug,

src/Orleans.Runtime/Silo/TestHooks/ITestHooksSystemTarget.cs

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -10,8 +10,6 @@ internal interface ITestHooks
1010
Task<string> GetConsistentRingProviderDiagnosticInfo();
1111
Task<string> GetServiceId();
1212
Task<bool> HasStorageProvider(string providerName);
13-
Task<bool> HasStreamProvider(string providerName);
14-
Task<int> UnregisterGrainForTesting(GrainId grain);
1513
Task<Dictionary<SiloAddress, SiloStatus>> GetApproximateSiloStatuses();
1614
}
1715

src/Orleans.Runtime/Silo/TestHooks/TestHooksSystemTarget.cs

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -85,8 +85,6 @@ public Task<bool> HasStreamProvider(string providerName)
8585
return Task.FromResult(this.serviceProvider.GetKeyedService<IGrainStorage>(providerName) != null);
8686
}
8787

88-
public Task<int> UnregisterGrainForTesting(GrainId grain) => Task.FromResult(this.serviceProvider.GetRequiredService<Catalog>().UnregisterGrainForTesting(grain));
89-
9088
public Task<Dictionary<SiloAddress, SiloStatus>> GetApproximateSiloStatuses() => Task.FromResult(this.siloStatusOracle.GetApproximateSiloStatuses());
9189
}
9290
}

0 commit comments

Comments
 (0)