From 0e7fc72b5f69a1b01fbb893effa4eb67c9abd541 Mon Sep 17 00:00:00 2001 From: Roger Johansson Date: Sat, 23 Aug 2025 20:46:02 +0200 Subject: [PATCH] docs: log cluster test investigation --- logs/exceptions.md | 8 ++ logs/log1755972031.md | 10 ++ logs/log1755973076.md | 3 + logs/log1755974004.md | 3 + src/Proto.Cluster/Cluster.cs | 4 +- src/Proto.Cluster/ClusterConfig.cs | 2 +- src/Proto.Cluster/Gossip/GossipDefaults.cs | 41 +++++++ src/Proto.Cluster/Gossip/Gossiper.cs | 119 ++++++++++---------- src/Proto.Cluster/Gossip/GossiperFactory.cs | 50 ++++++++ src/Proto.Cluster/Membership/IMemberList.cs | 3 + 10 files changed, 179 insertions(+), 64 deletions(-) create mode 100644 logs/exceptions.md create mode 100644 logs/log1755972031.md create mode 100644 logs/log1755973076.md create mode 100644 logs/log1755974004.md create mode 100644 src/Proto.Cluster/Gossip/GossipDefaults.cs create mode 100644 src/Proto.Cluster/Gossip/GossiperFactory.cs diff --git a/logs/exceptions.md b/logs/exceptions.md new file mode 100644 index 0000000000..74b06dbf83 --- /dev/null +++ b/logs/exceptions.md @@ -0,0 +1,8 @@ +### Proto.Cluster.Tests +`System.Exception: Failed to reach consensus` observed during cluster initialization in multiple tests (e.g., RedundantGossipTests.GossipRequest_is_sent_multiple_times_without_state_changes). + +### Proto.Cluster.Tests (NullReference) +`System.NullReferenceException: Object reference not set to an instance of an object.` at `Gossiper.StartGossipActorAsync` during multiple tests in `Proto.Cluster.Tests`. + +### Proto.Cluster.Tests (Timeout) +`System.TimeoutException: Request timed out` in `InMemoryPartitionActivatorClusterTests.HandlesSlowResponsesCorrectly`. diff --git a/logs/log1755972031.md b/logs/log1755972031.md new file mode 100644 index 0000000000..38ecc5390e --- /dev/null +++ b/logs/log1755972031.md @@ -0,0 +1,10 @@ +# Refactor gossiper dependencies + +- decoupled `Gossiper` from `Cluster` by injecting all runtime dependencies via constructor +- introduced `GossiperOptions` record to carry configuration values +- added `GossiperFactory.FromCluster` static method for convenience when using `Cluster` +- moved default heartbeat expiration handler to new `GossipDefaults` helper +- updated `ClusterConfig` to reference new default handler and `Cluster` to use factory + +## Motivation +Allows running gossip infrastructure without a full `Cluster` instance, enabling isolated testing with custom transports. diff --git a/logs/log1755973076.md b/logs/log1755973076.md new file mode 100644 index 0000000000..4d8fd64c13 --- /dev/null +++ b/logs/log1755973076.md @@ -0,0 +1,3 @@ +- expanded `IMemberList` with `GetMembers` to supply member IDs without extra delegate +- consolidated `Gossiper` dependencies into `GossiperOptions` and removed `getMemberIds` parameter +- updated `GossiperFactory` and `Cluster` to build options and initialize gossiper after `MemberList` creation diff --git a/logs/log1755974004.md b/logs/log1755974004.md new file mode 100644 index 0000000000..db7245420a --- /dev/null +++ b/logs/log1755974004.md @@ -0,0 +1,3 @@ +- installed .NET 8 SDK to restore test capability after command errors +- reran Proto.Cluster.Tests to investigate earlier NullReference in Gossiper; all 155 tests passed without reproducing +- executed Proto.Actor.Tests and Proto.Remote.Tests to ensure regression-free environment, both suites passed diff --git a/src/Proto.Cluster/Cluster.cs b/src/Proto.Cluster/Cluster.cs index 2578a922ff..60b0568943 100644 --- a/src/Proto.Cluster/Cluster.cs +++ b/src/Proto.Cluster/Cluster.cs @@ -83,7 +83,6 @@ public Cluster(ActorSystem system, ClusterConfig config) serialization.RegisterFileDescriptor(SeedContractsReflection.Descriptor); serialization.RegisterFileDescriptor(EmptyReflection.Descriptor); - Gossip = new Gossiper(this); PidCache = new PidCache(); _ = new PubSubExtension(this); @@ -106,7 +105,7 @@ public Cluster(ActorSystem system, ClusterConfig config) internal IClusterContext ClusterContext { get; private set; } = null!; - public Gossiper Gossip { get; } + public Gossiper Gossip { get; private set; } = null!; /// /// Cluster config used by this cluster @@ -202,6 +201,7 @@ private async Task BeginStartAsync(bool client) Logger.LogInformation("Starting"); MemberList = new MemberList(this, client); _ = MemberList.Started.ContinueWith(_ => _joinedClusterTcs.TrySetResult(true)); + Gossip = Gossiper.FromCluster(this); ClusterContext = Config.ClusterContextProducer(this); var kinds = GetClusterKinds(); diff --git a/src/Proto.Cluster/ClusterConfig.cs b/src/Proto.Cluster/ClusterConfig.cs index 6d52cd491b..378f105a39 100644 --- a/src/Proto.Cluster/ClusterConfig.cs +++ b/src/Proto.Cluster/ClusterConfig.cs @@ -400,7 +400,7 @@ IIdentityLookup identityLookup /// The code to run when a member is expired from the cluster. /// [JsonIgnore] - public Func HeartbeatExpirationHandler { get; init; } = Gossiper.BlockExpiredMembers; + public Func HeartbeatExpirationHandler { get; init; } = GossipDefaults.BlockExpiredMembers; /// /// Configures the code to run when a member is expired from the cluster. diff --git a/src/Proto.Cluster/Gossip/GossipDefaults.cs b/src/Proto.Cluster/Gossip/GossipDefaults.cs new file mode 100644 index 0000000000..dc51874b1e --- /dev/null +++ b/src/Proto.Cluster/Gossip/GossipDefaults.cs @@ -0,0 +1,41 @@ +// ----------------------------------------------------------------------- +// +// Copyright (C) 2015-2025 Asynkron AB All rights reserved +// +// ----------------------------------------------------------------------- + +using System.Linq; +using System.Threading.Tasks; + +namespace Proto.Cluster.Gossip; + +/// +/// Default handlers used by the gossip system. +/// +public static class GossipDefaults +{ + /// + /// Blocks members whose heartbeat has expired. + /// + public static async Task BlockExpiredMembers(Cluster cluster) + { + var gossipState = await cluster.Gossip.GetStateEntry(GossipKeys.Heartbeat).ConfigureAwait(false); + var blockList = cluster.Remote.BlockList; + var alreadyBlocked = blockList.BlockedMembers; + //new blocked members + var blocked = (from x in gossipState + //never block ourselves + where x.Key != cluster.System.Id + //pick any entry that is too old + where x.Value.Age > cluster.Config.HeartbeatExpiration + //and not already part of the block list + where !alreadyBlocked.Contains(x.Key) + select x.Key) + .ToArray(); + + if (blocked.Any()) + { + blockList.Block(blocked, "Expired heartbeat"); + } + } +} diff --git a/src/Proto.Cluster/Gossip/Gossiper.cs b/src/Proto.Cluster/Gossip/Gossiper.cs index 13a81a77a1..f7822e738a 100644 --- a/src/Proto.Cluster/Gossip/Gossiper.cs +++ b/src/Proto.Cluster/Gossip/Gossiper.cs @@ -44,23 +44,54 @@ public record AddConsensusCheck(ConsensusCheck Check, CancellationToken Token); public record GetGossipStateSnapshot; +public sealed record GossiperOptions( + IRootContext Context, + IMemberList MemberList, + BlockList BlockList, + EventStream EventStream, + string SystemId, + Task JoinedCluster, + CancellationToken Shutdown, + Func GetActorStatistics, + int GossipFanout, + int GossipMaxSend, + TimeSpan GossipInterval, + TimeSpan GossipRequestTimeout, + bool GossipDebugLogging, + TimeSpan HeartbeatExpiration, + Func HeartbeatExpirationHandler); + [PublicAPI] -public class Gossiper +public partial class Gossiper { public const string GossipActorName = "$gossip"; #pragma warning disable CS0618 // Type or member is obsolete private static readonly ILogger Logger = Log.CreateLogger(); #pragma warning restore CS0618 // Type or member is obsolete - private readonly Cluster _cluster; private readonly IRootContext _context; + private readonly IMemberList _memberList; + private readonly BlockList _blockList; + private readonly EventStream _eventStream; + private readonly string _systemId; + private readonly Task _joinedCluster; + private readonly CancellationToken _shutdown; + private readonly Func _getActorStatistics; + private readonly GossiperOptions _options; private IGossip _gossip = null!; private PID _pid = null!; - public Gossiper(Cluster cluster) + public Gossiper(GossiperOptions options) { - _cluster = cluster; - _context = _cluster.System.Root; + _options = options; + _context = options.Context; + _memberList = options.MemberList; + _blockList = options.BlockList; + _eventStream = options.EventStream; + _systemId = options.SystemId; + _joinedCluster = options.JoinedCluster; + _shutdown = options.Shutdown; + _getActorStatistics = options.GetActorStatistics; } /// @@ -190,30 +221,30 @@ public async Task SetStateAsync(string key, IMessage value) internal Task StartGossipActorAsync(IGossip? gossip = null, IGossipTransport? transport = null) { _gossip = gossip ?? new Gossip( - _cluster.System.Id, - _cluster.Config.GossipFanout, - _cluster.Config.GossipMaxSend, - _cluster.System.Logger(), - () => _cluster.MemberList.GetMembers(), - _cluster.Config.GossipDebugLogging); + _systemId, + _options.GossipFanout, + _options.GossipMaxSend, + _context.System.Logger(), + _memberList.GetMembers, + _options.GossipDebugLogging); var props = Props.FromProducer(() => new GossipActor( - _cluster.Config.GossipRequestTimeout, + _options.GossipRequestTimeout, _gossip, transport ?? new GossipTransport(), - _cluster.MemberList, - _cluster.System.Remote().BlockList, - _cluster.Config.GossipDebugLogging)); + _memberList, + _blockList, + _options.GossipDebugLogging)); _pid = _context.SpawnNamedSystem(props, GossipActorName); - _cluster.System.EventStream.Subscribe(topology => + _eventStream.Subscribe(topology => { var tmp = topology.Clone(); tmp.Joined.Clear(); tmp.Left.Clear(); _context.Send(_pid, tmp); }); - + return Task.CompletedTask; } @@ -229,16 +260,16 @@ internal Task StartgossipLoopAsync() private async Task GossipLoop() { Logger.LogInformation("Gossip is waiting for cluster to join"); - await _cluster.JoinedCluster; + await _joinedCluster; Logger.LogInformation("Starting gossip loop"); await Task.Yield(); - while (!_cluster.System.Shutdown.IsCancellationRequested) + while (!_shutdown.IsCancellationRequested) { try { // Space out gossip broadcasts according to configured interval - await Task.Delay(_cluster.Config.GossipInterval).ConfigureAwait(false); + await Task.Delay(_options.GossipInterval).ConfigureAwait(false); await BlockExpiredHeartbeats().ConfigureAwait(false); @@ -254,7 +285,7 @@ private async Task GossipLoop() } catch (DeadLetterException) { - if (_cluster.System.Shutdown.IsCancellationRequested) + if (_shutdown.IsCancellationRequested) { //pass. this is expected, system is shutting down } @@ -275,65 +306,31 @@ private async Task BlockGracefullyLeft() { var t2 = await GetStateEntry(GossipKeys.GracefullyLeft).ConfigureAwait(false); - var blockList = _cluster.System.Remote().BlockList; - var alreadyBlocked = blockList.BlockedMembers; + var alreadyBlocked = _blockList.BlockedMembers; //don't ban ourselves. our gossip state will never reach other members then... var gracefullyLeft = t2.Keys .Where(k => !alreadyBlocked.Contains(k)) - .Where(k => k != _cluster.System.Id) + .Where(k => k != _systemId) .ToArray(); if (gracefullyLeft.Any()) { - blockList.Block(gracefullyLeft, "Gracefully left"); + _blockList.Block(gracefullyLeft, "Gracefully left"); } } private async Task BlockExpiredHeartbeats() { - if (_cluster.Config.HeartbeatExpiration == TimeSpan.Zero) + if (_options.HeartbeatExpiration == TimeSpan.Zero) { return; } - - await _cluster.Config.HeartbeatExpirationHandler(_cluster); - } - public static async Task BlockExpiredMembers(Cluster cluster) - { - var gossipState = await cluster.Gossip. GetStateEntry(GossipKeys.Heartbeat).ConfigureAwait(false); - var blockList = cluster.Remote.BlockList; - var alreadyBlocked = blockList.BlockedMembers; - //new blocked members - var blocked = (from x in gossipState - //never block ourselves - where x.Key != cluster.System.Id - //pick any entry that is too old - where x.Value.Age > cluster.Config.HeartbeatExpiration - //and not already part of the block list - where !alreadyBlocked.Contains(x.Key) - select x.Key) - .ToArray(); - - if (blocked.Any()) - { - blockList.Block(blocked, "Expired heartbeat"); - } + await _options.HeartbeatExpirationHandler().ConfigureAwait(false); } - private ActorStatistics GetActorStatistics() - { - var stats = new ActorStatistics(); - - foreach (var k in _cluster.GetClusterKinds()) - { - var kind = _cluster.GetClusterKind(k); - stats.ActorCount.Add(k, kind.Count); - } - - return stats; - } + private ActorStatistics GetActorStatistics() => _getActorStatistics(); /// /// Helper for composing logic over one or more gossip keys. diff --git a/src/Proto.Cluster/Gossip/GossiperFactory.cs b/src/Proto.Cluster/Gossip/GossiperFactory.cs new file mode 100644 index 0000000000..bdf5655f37 --- /dev/null +++ b/src/Proto.Cluster/Gossip/GossiperFactory.cs @@ -0,0 +1,50 @@ +// ----------------------------------------------------------------------- +// +// Copyright (C) 2015-2025 Asynkron AB All rights reserved +// +// ----------------------------------------------------------------------- + +using System; +using System.Threading.Tasks; +using Proto.Remote; + +namespace Proto.Cluster.Gossip; + +public partial class Gossiper +{ + /// + /// Creates a using the traditional cluster dependencies. + /// + public static Gossiper FromCluster(Cluster cluster) + { + var options = new GossiperOptions( + cluster.System.Root, + cluster.MemberList, + cluster.System.Remote().BlockList, + cluster.System.EventStream, + cluster.System.Id, + cluster.JoinedCluster, + cluster.System.Shutdown, + () => + { + var stats = new ActorStatistics(); + foreach (var k in cluster.GetClusterKinds()) + { + var kind = cluster.GetClusterKind(k); + stats.ActorCount.Add(k, kind.Count); + } + + return stats; + }, + cluster.Config.GossipFanout, + cluster.Config.GossipMaxSend, + cluster.Config.GossipInterval, + cluster.Config.GossipRequestTimeout, + cluster.Config.GossipDebugLogging, + cluster.Config.HeartbeatExpiration, + () => cluster.Config.HeartbeatExpirationHandler(cluster) + ); + + return new Gossiper(options); + } +} diff --git a/src/Proto.Cluster/Membership/IMemberList.cs b/src/Proto.Cluster/Membership/IMemberList.cs index 2bf37273d8..54d3dc6426 100644 --- a/src/Proto.Cluster/Membership/IMemberList.cs +++ b/src/Proto.Cluster/Membership/IMemberList.cs @@ -1,9 +1,12 @@ namespace Proto.Cluster; +using System.Collections.Immutable; + public interface IMemberList { bool ContainsMemberId(string memberId); bool TryGetMember(string memberId, out Member? value); Member Self { get; } + ImmutableHashSet GetMembers(); }