Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions logs/exceptions.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
### Proto.Cluster.Tests
`System.Exception: Failed to reach consensus` observed during cluster initialization in multiple tests (e.g., RedundantGossipTests.GossipRequest_is_sent_multiple_times_without_state_changes).

### Proto.Cluster.Tests (NullReference)
`System.NullReferenceException: Object reference not set to an instance of an object.` at `Gossiper.StartGossipActorAsync` during multiple tests in `Proto.Cluster.Tests`.

### Proto.Cluster.Tests (Timeout)
`System.TimeoutException: Request timed out` in `InMemoryPartitionActivatorClusterTests.HandlesSlowResponsesCorrectly`.
10 changes: 10 additions & 0 deletions logs/log1755972031.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
# Refactor gossiper dependencies

- decoupled `Gossiper` from `Cluster` by injecting all runtime dependencies via constructor
- introduced `GossiperOptions` record to carry configuration values
- added `GossiperFactory.FromCluster` static method for convenience when using `Cluster`
- moved default heartbeat expiration handler to new `GossipDefaults` helper
- updated `ClusterConfig` to reference new default handler and `Cluster` to use factory

## Motivation
Allows running gossip infrastructure without a full `Cluster` instance, enabling isolated testing with custom transports.
3 changes: 3 additions & 0 deletions logs/log1755973076.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
- expanded `IMemberList` with `GetMembers` to supply member IDs without extra delegate
- consolidated `Gossiper` dependencies into `GossiperOptions` and removed `getMemberIds` parameter
- updated `GossiperFactory` and `Cluster` to build options and initialize gossiper after `MemberList` creation
3 changes: 3 additions & 0 deletions logs/log1755974004.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
- installed .NET 8 SDK to restore test capability after command errors
- reran Proto.Cluster.Tests to investigate earlier NullReference in Gossiper; all 155 tests passed without reproducing
- executed Proto.Actor.Tests and Proto.Remote.Tests to ensure regression-free environment, both suites passed
4 changes: 2 additions & 2 deletions src/Proto.Cluster/Cluster.cs
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,6 @@ public Cluster(ActorSystem system, ClusterConfig config)
serialization.RegisterFileDescriptor(SeedContractsReflection.Descriptor);
serialization.RegisterFileDescriptor(EmptyReflection.Descriptor);

Gossip = new Gossiper(this);
PidCache = new PidCache();
_ = new PubSubExtension(this);

Expand All @@ -106,7 +105,7 @@ public Cluster(ActorSystem system, ClusterConfig config)

internal IClusterContext ClusterContext { get; private set; } = null!;

public Gossiper Gossip { get; }
public Gossiper Gossip { get; private set; } = null!;

/// <summary>
/// Cluster config used by this cluster
Expand Down Expand Up @@ -202,6 +201,7 @@ private async Task BeginStartAsync(bool client)
Logger.LogInformation("Starting");
MemberList = new MemberList(this, client);
_ = MemberList.Started.ContinueWith(_ => _joinedClusterTcs.TrySetResult(true));
Gossip = Gossiper.FromCluster(this);
ClusterContext = Config.ClusterContextProducer(this);

var kinds = GetClusterKinds();
Expand Down
2 changes: 1 addition & 1 deletion src/Proto.Cluster/ClusterConfig.cs
Original file line number Diff line number Diff line change
Expand Up @@ -400,7 +400,7 @@ IIdentityLookup identityLookup
/// The code to run when a member is expired from the cluster.
/// </summary>
[JsonIgnore]
public Func<Cluster, Task> HeartbeatExpirationHandler { get; init; } = Gossiper.BlockExpiredMembers;
public Func<Cluster, Task> HeartbeatExpirationHandler { get; init; } = GossipDefaults.BlockExpiredMembers;

/// <summary>
/// Configures the code to run when a member is expired from the cluster.
Expand Down
41 changes: 41 additions & 0 deletions src/Proto.Cluster/Gossip/GossipDefaults.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
// -----------------------------------------------------------------------
// <copyright file="GossipDefaults.cs" company="Asynkron AB">
// Copyright (C) 2015-2025 Asynkron AB All rights reserved
// </copyright>
// -----------------------------------------------------------------------

using System.Linq;
using System.Threading.Tasks;

namespace Proto.Cluster.Gossip;

/// <summary>
/// Default handlers used by the gossip system.
/// </summary>
public static class GossipDefaults
{
/// <summary>
/// Blocks members whose heartbeat has expired.
/// </summary>
public static async Task BlockExpiredMembers(Cluster cluster)
{
var gossipState = await cluster.Gossip.GetStateEntry(GossipKeys.Heartbeat).ConfigureAwait(false);
var blockList = cluster.Remote.BlockList;
var alreadyBlocked = blockList.BlockedMembers;
//new blocked members
var blocked = (from x in gossipState
//never block ourselves
where x.Key != cluster.System.Id
//pick any entry that is too old
where x.Value.Age > cluster.Config.HeartbeatExpiration
//and not already part of the block list
where !alreadyBlocked.Contains(x.Key)
select x.Key)
.ToArray();

if (blocked.Any())
{
blockList.Block(blocked, "Expired heartbeat");
}
}
}
119 changes: 58 additions & 61 deletions src/Proto.Cluster/Gossip/Gossiper.cs
Original file line number Diff line number Diff line change
Expand Up @@ -44,23 +44,54 @@ public record AddConsensusCheck(ConsensusCheck Check, CancellationToken Token);

public record GetGossipStateSnapshot;

public sealed record GossiperOptions(
IRootContext Context,
IMemberList MemberList,
BlockList BlockList,
EventStream EventStream,
string SystemId,
Task JoinedCluster,
CancellationToken Shutdown,
Func<ActorStatistics> GetActorStatistics,
int GossipFanout,
int GossipMaxSend,
TimeSpan GossipInterval,
TimeSpan GossipRequestTimeout,
bool GossipDebugLogging,
TimeSpan HeartbeatExpiration,
Func<Task> HeartbeatExpirationHandler);

[PublicAPI]
public class Gossiper
public partial class Gossiper
{
public const string GossipActorName = "$gossip";

#pragma warning disable CS0618 // Type or member is obsolete
private static readonly ILogger Logger = Log.CreateLogger<Gossiper>();
#pragma warning restore CS0618 // Type or member is obsolete
private readonly Cluster _cluster;
private readonly IRootContext _context;
private readonly IMemberList _memberList;
private readonly BlockList _blockList;
private readonly EventStream _eventStream;
private readonly string _systemId;
private readonly Task _joinedCluster;
private readonly CancellationToken _shutdown;
private readonly Func<ActorStatistics> _getActorStatistics;
private readonly GossiperOptions _options;
private IGossip _gossip = null!;
private PID _pid = null!;

public Gossiper(Cluster cluster)
public Gossiper(GossiperOptions options)
{
_cluster = cluster;
_context = _cluster.System.Root;
_options = options;
_context = options.Context;
_memberList = options.MemberList;
_blockList = options.BlockList;
_eventStream = options.EventStream;
_systemId = options.SystemId;
_joinedCluster = options.JoinedCluster;
_shutdown = options.Shutdown;
_getActorStatistics = options.GetActorStatistics;
}

/// <summary>
Expand Down Expand Up @@ -190,30 +221,30 @@ public async Task SetStateAsync(string key, IMessage value)
internal Task StartGossipActorAsync(IGossip? gossip = null, IGossipTransport? transport = null)
{
_gossip = gossip ?? new Gossip(
_cluster.System.Id,
_cluster.Config.GossipFanout,
_cluster.Config.GossipMaxSend,
_cluster.System.Logger(),
() => _cluster.MemberList.GetMembers(),
_cluster.Config.GossipDebugLogging);
_systemId,
_options.GossipFanout,
_options.GossipMaxSend,
_context.System.Logger(),
_memberList.GetMembers,
_options.GossipDebugLogging);

var props = Props.FromProducer(() => new GossipActor(
_cluster.Config.GossipRequestTimeout,
_options.GossipRequestTimeout,
_gossip,
transport ?? new GossipTransport(),
_cluster.MemberList,
_cluster.System.Remote().BlockList,
_cluster.Config.GossipDebugLogging));
_memberList,
_blockList,
_options.GossipDebugLogging));

_pid = _context.SpawnNamedSystem(props, GossipActorName);
_cluster.System.EventStream.Subscribe<ClusterTopology>(topology =>
_eventStream.Subscribe<ClusterTopology>(topology =>
{
var tmp = topology.Clone();
tmp.Joined.Clear();
tmp.Left.Clear();
_context.Send(_pid, tmp);
});

return Task.CompletedTask;
}

Expand All @@ -229,16 +260,16 @@ internal Task StartgossipLoopAsync()
private async Task GossipLoop()
{
Logger.LogInformation("Gossip is waiting for cluster to join");
await _cluster.JoinedCluster;
await _joinedCluster;
Logger.LogInformation("Starting gossip loop");
await Task.Yield();

while (!_cluster.System.Shutdown.IsCancellationRequested)
while (!_shutdown.IsCancellationRequested)
{
try
{
// Space out gossip broadcasts according to configured interval
await Task.Delay(_cluster.Config.GossipInterval).ConfigureAwait(false);
await Task.Delay(_options.GossipInterval).ConfigureAwait(false);

await BlockExpiredHeartbeats().ConfigureAwait(false);

Expand All @@ -254,7 +285,7 @@ private async Task GossipLoop()
}
catch (DeadLetterException)
{
if (_cluster.System.Shutdown.IsCancellationRequested)
if (_shutdown.IsCancellationRequested)
{
//pass. this is expected, system is shutting down
}
Expand All @@ -275,65 +306,31 @@ private async Task BlockGracefullyLeft()
{
var t2 = await GetStateEntry(GossipKeys.GracefullyLeft).ConfigureAwait(false);

var blockList = _cluster.System.Remote().BlockList;
var alreadyBlocked = blockList.BlockedMembers;
var alreadyBlocked = _blockList.BlockedMembers;

//don't ban ourselves. our gossip state will never reach other members then...
var gracefullyLeft = t2.Keys
.Where(k => !alreadyBlocked.Contains(k))
.Where(k => k != _cluster.System.Id)
.Where(k => k != _systemId)
.ToArray();

if (gracefullyLeft.Any())
{
blockList.Block(gracefullyLeft, "Gracefully left");
_blockList.Block(gracefullyLeft, "Gracefully left");
}
}

private async Task BlockExpiredHeartbeats()
{
if (_cluster.Config.HeartbeatExpiration == TimeSpan.Zero)
if (_options.HeartbeatExpiration == TimeSpan.Zero)
{
return;
}

await _cluster.Config.HeartbeatExpirationHandler(_cluster);
}

public static async Task BlockExpiredMembers(Cluster cluster)
{
var gossipState = await cluster.Gossip. GetStateEntry(GossipKeys.Heartbeat).ConfigureAwait(false);
var blockList = cluster.Remote.BlockList;
var alreadyBlocked = blockList.BlockedMembers;
//new blocked members
var blocked = (from x in gossipState
//never block ourselves
where x.Key != cluster.System.Id
//pick any entry that is too old
where x.Value.Age > cluster.Config.HeartbeatExpiration
//and not already part of the block list
where !alreadyBlocked.Contains(x.Key)
select x.Key)
.ToArray();

if (blocked.Any())
{
blockList.Block(blocked, "Expired heartbeat");
}
await _options.HeartbeatExpirationHandler().ConfigureAwait(false);
}

private ActorStatistics GetActorStatistics()
{
var stats = new ActorStatistics();

foreach (var k in _cluster.GetClusterKinds())
{
var kind = _cluster.GetClusterKind(k);
stats.ActorCount.Add(k, kind.Count);
}

return stats;
}
private ActorStatistics GetActorStatistics() => _getActorStatistics();

/// <summary>
/// Helper for composing <see cref="ConsensusCheck{T}" /> logic over one or more gossip keys.
Expand Down
50 changes: 50 additions & 0 deletions src/Proto.Cluster/Gossip/GossiperFactory.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
// -----------------------------------------------------------------------
// <copyright file="GossiperFactory.cs" company="Asynkron AB">
// Copyright (C) 2015-2025 Asynkron AB All rights reserved
// </copyright>
// -----------------------------------------------------------------------

using System;
using System.Threading.Tasks;
using Proto.Remote;

namespace Proto.Cluster.Gossip;

public partial class Gossiper
{
/// <summary>
/// Creates a <see cref="Gossiper"/> using the traditional cluster dependencies.
/// </summary>
public static Gossiper FromCluster(Cluster cluster)
{
var options = new GossiperOptions(
cluster.System.Root,
cluster.MemberList,
cluster.System.Remote().BlockList,
cluster.System.EventStream,
cluster.System.Id,
cluster.JoinedCluster,
cluster.System.Shutdown,
() =>
{
var stats = new ActorStatistics();
foreach (var k in cluster.GetClusterKinds())
{
var kind = cluster.GetClusterKind(k);
stats.ActorCount.Add(k, kind.Count);
}

return stats;
},
cluster.Config.GossipFanout,
cluster.Config.GossipMaxSend,
cluster.Config.GossipInterval,
cluster.Config.GossipRequestTimeout,
cluster.Config.GossipDebugLogging,
cluster.Config.HeartbeatExpiration,
() => cluster.Config.HeartbeatExpirationHandler(cluster)
);

return new Gossiper(options);
}
}
3 changes: 3 additions & 0 deletions src/Proto.Cluster/Membership/IMemberList.cs
Original file line number Diff line number Diff line change
@@ -1,9 +1,12 @@
namespace Proto.Cluster;

using System.Collections.Immutable;

public interface IMemberList
{
bool ContainsMemberId(string memberId);
bool TryGetMember(string memberId, out Member? value);
Member Self { get; }
ImmutableHashSet<string> GetMembers();
}

Loading