@@ -44,23 +44,54 @@ public record AddConsensusCheck(ConsensusCheck Check, CancellationToken Token);
4444
4545public record GetGossipStateSnapshot ;
4646
47+ public sealed record GossiperOptions (
48+ IRootContext Context ,
49+ IMemberList MemberList ,
50+ BlockList BlockList ,
51+ EventStream EventStream ,
52+ string SystemId ,
53+ Task JoinedCluster ,
54+ CancellationToken Shutdown ,
55+ Func < ActorStatistics > GetActorStatistics ,
56+ int GossipFanout ,
57+ int GossipMaxSend ,
58+ TimeSpan GossipInterval ,
59+ TimeSpan GossipRequestTimeout ,
60+ bool GossipDebugLogging ,
61+ TimeSpan HeartbeatExpiration ,
62+ Func < Task > HeartbeatExpirationHandler ) ;
63+
4764[ PublicAPI ]
48- public class Gossiper
65+ public partial class Gossiper
4966{
5067 public const string GossipActorName = "$gossip" ;
5168
5269#pragma warning disable CS0618 // Type or member is obsolete
5370 private static readonly ILogger Logger = Log . CreateLogger < Gossiper > ( ) ;
5471#pragma warning restore CS0618 // Type or member is obsolete
55- private readonly Cluster _cluster ;
5672 private readonly IRootContext _context ;
73+ private readonly IMemberList _memberList ;
74+ private readonly BlockList _blockList ;
75+ private readonly EventStream _eventStream ;
76+ private readonly string _systemId ;
77+ private readonly Task _joinedCluster ;
78+ private readonly CancellationToken _shutdown ;
79+ private readonly Func < ActorStatistics > _getActorStatistics ;
80+ private readonly GossiperOptions _options ;
5781 private IGossip _gossip = null ! ;
5882 private PID _pid = null ! ;
5983
60- public Gossiper ( Cluster cluster )
84+ public Gossiper ( GossiperOptions options )
6185 {
62- _cluster = cluster ;
63- _context = _cluster . System . Root ;
86+ _options = options ;
87+ _context = options . Context ;
88+ _memberList = options . MemberList ;
89+ _blockList = options . BlockList ;
90+ _eventStream = options . EventStream ;
91+ _systemId = options . SystemId ;
92+ _joinedCluster = options . JoinedCluster ;
93+ _shutdown = options . Shutdown ;
94+ _getActorStatistics = options . GetActorStatistics ;
6495 }
6596
6697 /// <summary>
@@ -190,30 +221,30 @@ public async Task SetStateAsync(string key, IMessage value)
190221 internal Task StartGossipActorAsync ( IGossip ? gossip = null , IGossipTransport ? transport = null )
191222 {
192223 _gossip = gossip ?? new Gossip (
193- _cluster . System . Id ,
194- _cluster . Config . GossipFanout ,
195- _cluster . Config . GossipMaxSend ,
196- _cluster . System . Logger ( ) ,
197- ( ) => _cluster . MemberList . GetMembers ( ) ,
198- _cluster . Config . GossipDebugLogging ) ;
224+ _systemId ,
225+ _options . GossipFanout ,
226+ _options . GossipMaxSend ,
227+ _context . System . Logger ( ) ,
228+ _memberList . GetMembers ,
229+ _options . GossipDebugLogging ) ;
199230
200231 var props = Props . FromProducer ( ( ) => new GossipActor (
201- _cluster . Config . GossipRequestTimeout ,
232+ _options . GossipRequestTimeout ,
202233 _gossip ,
203234 transport ?? new GossipTransport ( ) ,
204- _cluster . MemberList ,
205- _cluster . System . Remote ( ) . BlockList ,
206- _cluster . Config . GossipDebugLogging ) ) ;
235+ _memberList ,
236+ _blockList ,
237+ _options . GossipDebugLogging ) ) ;
207238
208239 _pid = _context . SpawnNamedSystem ( props , GossipActorName ) ;
209- _cluster . System . EventStream . Subscribe < ClusterTopology > ( topology =>
240+ _eventStream . Subscribe < ClusterTopology > ( topology =>
210241 {
211242 var tmp = topology . Clone ( ) ;
212243 tmp . Joined . Clear ( ) ;
213244 tmp . Left . Clear ( ) ;
214245 _context . Send ( _pid , tmp ) ;
215246 } ) ;
216-
247+
217248 return Task . CompletedTask ;
218249 }
219250
@@ -229,16 +260,16 @@ internal Task StartgossipLoopAsync()
229260 private async Task GossipLoop ( )
230261 {
231262 Logger . LogInformation ( "Gossip is waiting for cluster to join" ) ;
232- await _cluster . JoinedCluster ;
263+ await _joinedCluster ;
233264 Logger . LogInformation ( "Starting gossip loop" ) ;
234265 await Task . Yield ( ) ;
235266
236- while ( ! _cluster . System . Shutdown . IsCancellationRequested )
267+ while ( ! _shutdown . IsCancellationRequested )
237268 {
238269 try
239270 {
240271 // Space out gossip broadcasts according to configured interval
241- await Task . Delay ( _cluster . Config . GossipInterval ) . ConfigureAwait ( false ) ;
272+ await Task . Delay ( _options . GossipInterval ) . ConfigureAwait ( false ) ;
242273
243274 await BlockExpiredHeartbeats ( ) . ConfigureAwait ( false ) ;
244275
@@ -254,7 +285,7 @@ private async Task GossipLoop()
254285 }
255286 catch ( DeadLetterException )
256287 {
257- if ( _cluster . System . Shutdown . IsCancellationRequested )
288+ if ( _shutdown . IsCancellationRequested )
258289 {
259290 //pass. this is expected, system is shutting down
260291 }
@@ -275,65 +306,31 @@ private async Task BlockGracefullyLeft()
275306 {
276307 var t2 = await GetStateEntry ( GossipKeys . GracefullyLeft ) . ConfigureAwait ( false ) ;
277308
278- var blockList = _cluster . System . Remote ( ) . BlockList ;
279- var alreadyBlocked = blockList . BlockedMembers ;
309+ var alreadyBlocked = _blockList . BlockedMembers ;
280310
281311 //don't ban ourselves. our gossip state will never reach other members then...
282312 var gracefullyLeft = t2 . Keys
283313 . Where ( k => ! alreadyBlocked . Contains ( k ) )
284- . Where ( k => k != _cluster . System . Id )
314+ . Where ( k => k != _systemId )
285315 . ToArray ( ) ;
286316
287317 if ( gracefullyLeft . Any ( ) )
288318 {
289- blockList . Block ( gracefullyLeft , "Gracefully left" ) ;
319+ _blockList . Block ( gracefullyLeft , "Gracefully left" ) ;
290320 }
291321 }
292322
293323 private async Task BlockExpiredHeartbeats ( )
294324 {
295- if ( _cluster . Config . HeartbeatExpiration == TimeSpan . Zero )
325+ if ( _options . HeartbeatExpiration == TimeSpan . Zero )
296326 {
297327 return ;
298328 }
299-
300- await _cluster . Config . HeartbeatExpirationHandler ( _cluster ) ;
301- }
302329
303- public static async Task BlockExpiredMembers ( Cluster cluster )
304- {
305- var gossipState = await cluster . Gossip . GetStateEntry ( GossipKeys . Heartbeat ) . ConfigureAwait ( false ) ;
306- var blockList = cluster . Remote . BlockList ;
307- var alreadyBlocked = blockList . BlockedMembers ;
308- //new blocked members
309- var blocked = ( from x in gossipState
310- //never block ourselves
311- where x . Key != cluster . System . Id
312- //pick any entry that is too old
313- where x . Value . Age > cluster . Config . HeartbeatExpiration
314- //and not already part of the block list
315- where ! alreadyBlocked . Contains ( x . Key )
316- select x . Key )
317- . ToArray ( ) ;
318-
319- if ( blocked . Any ( ) )
320- {
321- blockList . Block ( blocked , "Expired heartbeat" ) ;
322- }
330+ await _options . HeartbeatExpirationHandler ( ) . ConfigureAwait ( false ) ;
323331 }
324332
325- private ActorStatistics GetActorStatistics ( )
326- {
327- var stats = new ActorStatistics ( ) ;
328-
329- foreach ( var k in _cluster . GetClusterKinds ( ) )
330- {
331- var kind = _cluster . GetClusterKind ( k ) ;
332- stats . ActorCount . Add ( k , kind . Count ) ;
333- }
334-
335- return stats ;
336- }
333+ private ActorStatistics GetActorStatistics ( ) => _getActorStatistics ( ) ;
337334
338335 /// <summary>
339336 /// Helper for composing <see cref="ConsensusCheck{T}" /> logic over one or more gossip keys.
0 commit comments