diff --git a/logs/exceptions.md b/logs/exceptions.md
new file mode 100644
index 0000000000..74b06dbf83
--- /dev/null
+++ b/logs/exceptions.md
@@ -0,0 +1,8 @@
+### Proto.Cluster.Tests
+`System.Exception: Failed to reach consensus` observed during cluster initialization in multiple tests (e.g., RedundantGossipTests.GossipRequest_is_sent_multiple_times_without_state_changes).
+
+### Proto.Cluster.Tests (NullReference)
+`System.NullReferenceException: Object reference not set to an instance of an object.` at `Gossiper.StartGossipActorAsync` during multiple tests in `Proto.Cluster.Tests`.
+
+### Proto.Cluster.Tests (Timeout)
+`System.TimeoutException: Request timed out` in `InMemoryPartitionActivatorClusterTests.HandlesSlowResponsesCorrectly`.
diff --git a/logs/log1755972031.md b/logs/log1755972031.md
new file mode 100644
index 0000000000..38ecc5390e
--- /dev/null
+++ b/logs/log1755972031.md
@@ -0,0 +1,10 @@
+# Refactor gossiper dependencies
+
+- decoupled `Gossiper` from `Cluster` by injecting all runtime dependencies via constructor
+- introduced `GossiperOptions` record to carry configuration values
+- added `GossiperFactory.FromCluster` static method for convenience when using `Cluster`
+- moved default heartbeat expiration handler to new `GossipDefaults` helper
+- updated `ClusterConfig` to reference new default handler and `Cluster` to use factory
+
+## Motivation
+Allows running gossip infrastructure without a full `Cluster` instance, enabling isolated testing with custom transports.
diff --git a/logs/log1755973076.md b/logs/log1755973076.md
new file mode 100644
index 0000000000..4d8fd64c13
--- /dev/null
+++ b/logs/log1755973076.md
@@ -0,0 +1,3 @@
+- expanded `IMemberList` with `GetMembers` to supply member IDs without extra delegate
+- consolidated `Gossiper` dependencies into `GossiperOptions` and removed `getMemberIds` parameter
+- updated `GossiperFactory` and `Cluster` to build options and initialize gossiper after `MemberList` creation
diff --git a/logs/log1755974004.md b/logs/log1755974004.md
new file mode 100644
index 0000000000..db7245420a
--- /dev/null
+++ b/logs/log1755974004.md
@@ -0,0 +1,3 @@
+- installed .NET 8 SDK to restore test capability after command errors
+- reran Proto.Cluster.Tests to investigate earlier NullReference in Gossiper; all 155 tests passed without reproducing
+- executed Proto.Actor.Tests and Proto.Remote.Tests to ensure regression-free environment, both suites passed
diff --git a/src/Proto.Cluster/Cluster.cs b/src/Proto.Cluster/Cluster.cs
index 2578a922ff..60b0568943 100644
--- a/src/Proto.Cluster/Cluster.cs
+++ b/src/Proto.Cluster/Cluster.cs
@@ -83,7 +83,6 @@ public Cluster(ActorSystem system, ClusterConfig config)
serialization.RegisterFileDescriptor(SeedContractsReflection.Descriptor);
serialization.RegisterFileDescriptor(EmptyReflection.Descriptor);
- Gossip = new Gossiper(this);
PidCache = new PidCache();
_ = new PubSubExtension(this);
@@ -106,7 +105,7 @@ public Cluster(ActorSystem system, ClusterConfig config)
internal IClusterContext ClusterContext { get; private set; } = null!;
- public Gossiper Gossip { get; }
+ public Gossiper Gossip { get; private set; } = null!;
///
/// Cluster config used by this cluster
@@ -202,6 +201,7 @@ private async Task BeginStartAsync(bool client)
Logger.LogInformation("Starting");
MemberList = new MemberList(this, client);
_ = MemberList.Started.ContinueWith(_ => _joinedClusterTcs.TrySetResult(true));
+ Gossip = Gossiper.FromCluster(this);
ClusterContext = Config.ClusterContextProducer(this);
var kinds = GetClusterKinds();
diff --git a/src/Proto.Cluster/ClusterConfig.cs b/src/Proto.Cluster/ClusterConfig.cs
index 6d52cd491b..378f105a39 100644
--- a/src/Proto.Cluster/ClusterConfig.cs
+++ b/src/Proto.Cluster/ClusterConfig.cs
@@ -400,7 +400,7 @@ IIdentityLookup identityLookup
/// The code to run when a member is expired from the cluster.
///
[JsonIgnore]
- public Func HeartbeatExpirationHandler { get; init; } = Gossiper.BlockExpiredMembers;
+ public Func HeartbeatExpirationHandler { get; init; } = GossipDefaults.BlockExpiredMembers;
///
/// Configures the code to run when a member is expired from the cluster.
diff --git a/src/Proto.Cluster/Gossip/GossipDefaults.cs b/src/Proto.Cluster/Gossip/GossipDefaults.cs
new file mode 100644
index 0000000000..dc51874b1e
--- /dev/null
+++ b/src/Proto.Cluster/Gossip/GossipDefaults.cs
@@ -0,0 +1,41 @@
+// -----------------------------------------------------------------------
+//
+// Copyright (C) 2015-2025 Asynkron AB All rights reserved
+//
+// -----------------------------------------------------------------------
+
+using System.Linq;
+using System.Threading.Tasks;
+
+namespace Proto.Cluster.Gossip;
+
+///
+/// Default handlers used by the gossip system.
+///
+public static class GossipDefaults
+{
+ ///
+ /// Blocks members whose heartbeat has expired.
+ ///
+ public static async Task BlockExpiredMembers(Cluster cluster)
+ {
+ var gossipState = await cluster.Gossip.GetStateEntry(GossipKeys.Heartbeat).ConfigureAwait(false);
+ var blockList = cluster.Remote.BlockList;
+ var alreadyBlocked = blockList.BlockedMembers;
+ //new blocked members
+ var blocked = (from x in gossipState
+ //never block ourselves
+ where x.Key != cluster.System.Id
+ //pick any entry that is too old
+ where x.Value.Age > cluster.Config.HeartbeatExpiration
+ //and not already part of the block list
+ where !alreadyBlocked.Contains(x.Key)
+ select x.Key)
+ .ToArray();
+
+ if (blocked.Any())
+ {
+ blockList.Block(blocked, "Expired heartbeat");
+ }
+ }
+}
diff --git a/src/Proto.Cluster/Gossip/Gossiper.cs b/src/Proto.Cluster/Gossip/Gossiper.cs
index 13a81a77a1..f7822e738a 100644
--- a/src/Proto.Cluster/Gossip/Gossiper.cs
+++ b/src/Proto.Cluster/Gossip/Gossiper.cs
@@ -44,23 +44,54 @@ public record AddConsensusCheck(ConsensusCheck Check, CancellationToken Token);
public record GetGossipStateSnapshot;
+public sealed record GossiperOptions(
+ IRootContext Context,
+ IMemberList MemberList,
+ BlockList BlockList,
+ EventStream EventStream,
+ string SystemId,
+ Task JoinedCluster,
+ CancellationToken Shutdown,
+ Func GetActorStatistics,
+ int GossipFanout,
+ int GossipMaxSend,
+ TimeSpan GossipInterval,
+ TimeSpan GossipRequestTimeout,
+ bool GossipDebugLogging,
+ TimeSpan HeartbeatExpiration,
+ Func HeartbeatExpirationHandler);
+
[PublicAPI]
-public class Gossiper
+public partial class Gossiper
{
public const string GossipActorName = "$gossip";
#pragma warning disable CS0618 // Type or member is obsolete
private static readonly ILogger Logger = Log.CreateLogger();
#pragma warning restore CS0618 // Type or member is obsolete
- private readonly Cluster _cluster;
private readonly IRootContext _context;
+ private readonly IMemberList _memberList;
+ private readonly BlockList _blockList;
+ private readonly EventStream _eventStream;
+ private readonly string _systemId;
+ private readonly Task _joinedCluster;
+ private readonly CancellationToken _shutdown;
+ private readonly Func _getActorStatistics;
+ private readonly GossiperOptions _options;
private IGossip _gossip = null!;
private PID _pid = null!;
- public Gossiper(Cluster cluster)
+ public Gossiper(GossiperOptions options)
{
- _cluster = cluster;
- _context = _cluster.System.Root;
+ _options = options;
+ _context = options.Context;
+ _memberList = options.MemberList;
+ _blockList = options.BlockList;
+ _eventStream = options.EventStream;
+ _systemId = options.SystemId;
+ _joinedCluster = options.JoinedCluster;
+ _shutdown = options.Shutdown;
+ _getActorStatistics = options.GetActorStatistics;
}
///
@@ -190,30 +221,30 @@ public async Task SetStateAsync(string key, IMessage value)
internal Task StartGossipActorAsync(IGossip? gossip = null, IGossipTransport? transport = null)
{
_gossip = gossip ?? new Gossip(
- _cluster.System.Id,
- _cluster.Config.GossipFanout,
- _cluster.Config.GossipMaxSend,
- _cluster.System.Logger(),
- () => _cluster.MemberList.GetMembers(),
- _cluster.Config.GossipDebugLogging);
+ _systemId,
+ _options.GossipFanout,
+ _options.GossipMaxSend,
+ _context.System.Logger(),
+ _memberList.GetMembers,
+ _options.GossipDebugLogging);
var props = Props.FromProducer(() => new GossipActor(
- _cluster.Config.GossipRequestTimeout,
+ _options.GossipRequestTimeout,
_gossip,
transport ?? new GossipTransport(),
- _cluster.MemberList,
- _cluster.System.Remote().BlockList,
- _cluster.Config.GossipDebugLogging));
+ _memberList,
+ _blockList,
+ _options.GossipDebugLogging));
_pid = _context.SpawnNamedSystem(props, GossipActorName);
- _cluster.System.EventStream.Subscribe(topology =>
+ _eventStream.Subscribe(topology =>
{
var tmp = topology.Clone();
tmp.Joined.Clear();
tmp.Left.Clear();
_context.Send(_pid, tmp);
});
-
+
return Task.CompletedTask;
}
@@ -229,16 +260,16 @@ internal Task StartgossipLoopAsync()
private async Task GossipLoop()
{
Logger.LogInformation("Gossip is waiting for cluster to join");
- await _cluster.JoinedCluster;
+ await _joinedCluster;
Logger.LogInformation("Starting gossip loop");
await Task.Yield();
- while (!_cluster.System.Shutdown.IsCancellationRequested)
+ while (!_shutdown.IsCancellationRequested)
{
try
{
// Space out gossip broadcasts according to configured interval
- await Task.Delay(_cluster.Config.GossipInterval).ConfigureAwait(false);
+ await Task.Delay(_options.GossipInterval).ConfigureAwait(false);
await BlockExpiredHeartbeats().ConfigureAwait(false);
@@ -254,7 +285,7 @@ private async Task GossipLoop()
}
catch (DeadLetterException)
{
- if (_cluster.System.Shutdown.IsCancellationRequested)
+ if (_shutdown.IsCancellationRequested)
{
//pass. this is expected, system is shutting down
}
@@ -275,65 +306,31 @@ private async Task BlockGracefullyLeft()
{
var t2 = await GetStateEntry(GossipKeys.GracefullyLeft).ConfigureAwait(false);
- var blockList = _cluster.System.Remote().BlockList;
- var alreadyBlocked = blockList.BlockedMembers;
+ var alreadyBlocked = _blockList.BlockedMembers;
//don't ban ourselves. our gossip state will never reach other members then...
var gracefullyLeft = t2.Keys
.Where(k => !alreadyBlocked.Contains(k))
- .Where(k => k != _cluster.System.Id)
+ .Where(k => k != _systemId)
.ToArray();
if (gracefullyLeft.Any())
{
- blockList.Block(gracefullyLeft, "Gracefully left");
+ _blockList.Block(gracefullyLeft, "Gracefully left");
}
}
private async Task BlockExpiredHeartbeats()
{
- if (_cluster.Config.HeartbeatExpiration == TimeSpan.Zero)
+ if (_options.HeartbeatExpiration == TimeSpan.Zero)
{
return;
}
-
- await _cluster.Config.HeartbeatExpirationHandler(_cluster);
- }
- public static async Task BlockExpiredMembers(Cluster cluster)
- {
- var gossipState = await cluster.Gossip. GetStateEntry(GossipKeys.Heartbeat).ConfigureAwait(false);
- var blockList = cluster.Remote.BlockList;
- var alreadyBlocked = blockList.BlockedMembers;
- //new blocked members
- var blocked = (from x in gossipState
- //never block ourselves
- where x.Key != cluster.System.Id
- //pick any entry that is too old
- where x.Value.Age > cluster.Config.HeartbeatExpiration
- //and not already part of the block list
- where !alreadyBlocked.Contains(x.Key)
- select x.Key)
- .ToArray();
-
- if (blocked.Any())
- {
- blockList.Block(blocked, "Expired heartbeat");
- }
+ await _options.HeartbeatExpirationHandler().ConfigureAwait(false);
}
- private ActorStatistics GetActorStatistics()
- {
- var stats = new ActorStatistics();
-
- foreach (var k in _cluster.GetClusterKinds())
- {
- var kind = _cluster.GetClusterKind(k);
- stats.ActorCount.Add(k, kind.Count);
- }
-
- return stats;
- }
+ private ActorStatistics GetActorStatistics() => _getActorStatistics();
///
/// Helper for composing logic over one or more gossip keys.
diff --git a/src/Proto.Cluster/Gossip/GossiperFactory.cs b/src/Proto.Cluster/Gossip/GossiperFactory.cs
new file mode 100644
index 0000000000..bdf5655f37
--- /dev/null
+++ b/src/Proto.Cluster/Gossip/GossiperFactory.cs
@@ -0,0 +1,50 @@
+// -----------------------------------------------------------------------
+//
+// Copyright (C) 2015-2025 Asynkron AB All rights reserved
+//
+// -----------------------------------------------------------------------
+
+using System;
+using System.Threading.Tasks;
+using Proto.Remote;
+
+namespace Proto.Cluster.Gossip;
+
+public partial class Gossiper
+{
+ ///
+ /// Creates a using the traditional cluster dependencies.
+ ///
+ public static Gossiper FromCluster(Cluster cluster)
+ {
+ var options = new GossiperOptions(
+ cluster.System.Root,
+ cluster.MemberList,
+ cluster.System.Remote().BlockList,
+ cluster.System.EventStream,
+ cluster.System.Id,
+ cluster.JoinedCluster,
+ cluster.System.Shutdown,
+ () =>
+ {
+ var stats = new ActorStatistics();
+ foreach (var k in cluster.GetClusterKinds())
+ {
+ var kind = cluster.GetClusterKind(k);
+ stats.ActorCount.Add(k, kind.Count);
+ }
+
+ return stats;
+ },
+ cluster.Config.GossipFanout,
+ cluster.Config.GossipMaxSend,
+ cluster.Config.GossipInterval,
+ cluster.Config.GossipRequestTimeout,
+ cluster.Config.GossipDebugLogging,
+ cluster.Config.HeartbeatExpiration,
+ () => cluster.Config.HeartbeatExpirationHandler(cluster)
+ );
+
+ return new Gossiper(options);
+ }
+}
diff --git a/src/Proto.Cluster/Membership/IMemberList.cs b/src/Proto.Cluster/Membership/IMemberList.cs
index 2bf37273d8..54d3dc6426 100644
--- a/src/Proto.Cluster/Membership/IMemberList.cs
+++ b/src/Proto.Cluster/Membership/IMemberList.cs
@@ -1,9 +1,12 @@
namespace Proto.Cluster;
+using System.Collections.Immutable;
+
public interface IMemberList
{
bool ContainsMemberId(string memberId);
bool TryGetMember(string memberId, out Member? value);
Member Self { get; }
+ ImmutableHashSet GetMembers();
}