Skip to content

Commit c2c6c1f

Browse files
authored
feat(gossip): inject member and block list dependencies (#2357)
1 parent fe821a2 commit c2c6c1f

File tree

9 files changed

+47
-21
lines changed

9 files changed

+47
-21
lines changed

logs/log1755949362.md

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
- Introduced IMemberList and IBlockList interfaces to decouple GossipActor from concrete MemberList and BlockList implementations.
2+
- Modified GossipActor to receive these interfaces, improving testability and making member/block lookups explicit dependencies.
3+
- Updated GossipSender to operate on IMemberList, reducing reliance on full Cluster object.
4+
- Adjusted Gossiper to supply MemberList and BlockList when creating GossipActor, ensuring compatibility with new constructor.
5+
- Revised GossipTransport tests to accommodate new GossipSender signature.

src/Proto.Cluster/Gossip/GossipActor.cs

Lines changed: 10 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,8 @@ public class GossipActor : IActor
2020
#pragma warning restore CS0618 // Type or member is obsolete
2121
private readonly TimeSpan _gossipRequestTimeout;
2222
private readonly IGossip _internal;
23+
private readonly IMemberList _memberList;
24+
private readonly IBlockList _blockList;
2325
private readonly IGossipTransport _transport;
2426

2527
// lookup from state key -> consensus checks
@@ -31,12 +33,16 @@ public GossipActor(
3133
int gossipFanout,
3234
int gossipMaxSend,
3335
IGossip gossip,
34-
IGossipTransport transport
36+
IGossipTransport transport,
37+
IMemberList memberList,
38+
IBlockList blockList
3539
)
3640
{
3741
_gossipRequestTimeout = gossipRequestTimeout;
3842
_internal = gossip;
3943
_transport = transport;
44+
_memberList = memberList;
45+
_blockList = blockList;
4046
}
4147

4248
public async Task ReceiveAsync(IContext context)
@@ -121,7 +127,7 @@ private Task OnGossipRequest(IContext context, GossipRequest gossipRequest)
121127
var logger = context.Logger()?.BeginScope<GossipActor>();
122128
logger?.LogDebug("Gossip Request {Sender}", context.Sender!);
123129

124-
if (context.Remote().BlockList.BlockedMembers.Contains(gossipRequest.MemberId))
130+
if (_blockList.BlockedMembers.Contains(gossipRequest.MemberId))
125131
{
126132
Logger.LogInformation("Blocked gossip request from {MemberId}", gossipRequest.MemberId);
127133
context.Respond(new GossipResponse()
@@ -130,8 +136,7 @@ private Task OnGossipRequest(IContext context, GossipRequest gossipRequest)
130136
});
131137
return Task.CompletedTask;
132138
}
133-
134-
if (!context.Cluster().MemberList.ContainsMemberId(gossipRequest.MemberId))
139+
if (!_memberList.ContainsMemberId(gossipRequest.MemberId))
135140
{
136141
Logger.LogInformation("Ignoring gossip request from {MemberId} as it is not a member", gossipRequest.MemberId);
137142
context.Respond(new GossipResponse()
@@ -140,18 +145,13 @@ private Task OnGossipRequest(IContext context, GossipRequest gossipRequest)
140145
});
141146
return Task.CompletedTask;
142147
}
143-
144-
145-
146148

147149
if (Logger.IsEnabled(LogLevel.Debug))
148150
{
149151
Logger.LogDebug("Gossip Request {Sender}", context.Sender!);
150152
}
151153

152154
ReceiveState(context, gossipRequest.State);
153-
154-
155155
if (context.Cluster().Config.GossipDebugLogging)
156156
{
157157
Logger.LogInformation("Responding to GossipRequest {Request} to {MemberId}", gossipRequest, gossipRequest.MemberId);
@@ -216,6 +216,6 @@ private void SendGossipForMember(IContext context, Member targetMember,
216216
Logger.LogInformation("Sending GossipRequest {Request} to {MemberId}", gossipRequest, targetMember.Id);
217217
}
218218

219-
GossipSender.Send(context, context.Cluster(), targetMember, memberStateDelta, gossipRequest, _gossipRequestTimeout, _transport);
219+
GossipSender.Send(context, _memberList, targetMember, memberStateDelta, gossipRequest, _gossipRequestTimeout, _transport);
220220
}
221221
}

src/Proto.Cluster/Gossip/GossipSender.cs

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@
1010
using Microsoft.Extensions.Logging;
1111
using Proto;
1212
using Proto.Logging;
13+
using Proto.Cluster;
1314

1415
namespace Proto.Cluster.Gossip;
1516

@@ -19,7 +20,7 @@ internal static class GossipSender
1920

2021
public static void Send(
2122
IContext context,
22-
Cluster cluster,
23+
IMemberList memberList,
2324
Member targetMember,
2425
MemberStateDelta memberStateDelta,
2526
GossipRequest request,
@@ -36,9 +37,9 @@ public static void Send(
3637
async task =>
3738
{
3839
var delta = DateTime.UtcNow - start;
39-
var self = cluster.MemberList.Self;
40+
var self = memberList.Self;
4041

41-
if (!cluster.MemberList.TryGetMember(targetMember.Id, out _))
42+
if (!memberList.TryGetMember(targetMember.Id, out _))
4243
{
4344
return;
4445
}
@@ -72,7 +73,7 @@ public static void Send(
7273
}
7374
catch (Exception x)
7475
{
75-
if (cluster.MemberList.TryGetMember(targetMember.Id, out _))
76+
if (memberList.TryGetMember(targetMember.Id, out _))
7677
{
7778
Logger.LogError(
7879
x,

src/Proto.Cluster/Gossip/Gossiper.cs

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -204,7 +204,9 @@ internal Task StartGossipActorAsync(IGossip? gossip = null, IGossipTransport? tr
204204
_cluster.Config.GossipFanout,
205205
_cluster.Config.GossipMaxSend,
206206
_gossip,
207-
transport ?? new GossipTransport()));
207+
transport ?? new GossipTransport(),
208+
_cluster.MemberList,
209+
_cluster.System.Remote().BlockList));
208210

209211
_pid = _context.SpawnNamedSystem(props, GossipActorName);
210212
_cluster.System.EventStream.Subscribe<ClusterTopology>(topology =>
Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,9 @@
1+
namespace Proto.Cluster;
2+
3+
public interface IMemberList
4+
{
5+
bool ContainsMemberId(string memberId);
6+
bool TryGetMember(string memberId, out Member? value);
7+
Member Self { get; }
8+
}
9+

src/Proto.Cluster/Membership/MemberList.cs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,7 @@ namespace Proto.Cluster;
2727
/// If the member learns that it is blocked from gossip, it will initiate shutdown.
2828
/// </summary>
2929
[PublicAPI]
30-
public record MemberList
30+
public record MemberList : IMemberList
3131
{
3232
#pragma warning disable CS0618 // Type or member is obsolete
3333
private static readonly ILogger Logger = Log.CreateLogger<MemberList>();

src/Proto.Remote/BlockList.cs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@ public record MemberBlocked(string MemberId, string Reason);
1818
/// unresponsiveness. Entries on this list expire after <see cref="ActorSystemConfig.BlockedMemberDuration"/>,
1919
/// defaults to 1 hour.
2020
/// </summary>
21-
public class BlockList
21+
public class BlockList : IBlockList
2222
{
2323
private readonly object _lock = new();
2424
private readonly ActorSystem _system;

src/Proto.Remote/IBlockList.cs

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,9 @@
1+
using System.Collections.Immutable;
2+
3+
namespace Proto.Remote;
4+
5+
public interface IBlockList
6+
{
7+
ImmutableHashSet<string> BlockedMembers { get; }
8+
}
9+

tests/Proto.Cluster.Tests/GossipTransportTests.cs

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -54,7 +54,7 @@ public async Task Success_should_commit_offsets()
5454

5555
var props = Props.FromFunc(ctx =>
5656
{
57-
GossipSender.Send(ctx, cluster, targetMember, delta, request, cluster.Config.GossipRequestTimeout, transport);
57+
GossipSender.Send(ctx, cluster.MemberList, targetMember, delta, request, cluster.Config.GossipRequestTimeout, transport);
5858
return Task.CompletedTask;
5959
});
6060

@@ -87,7 +87,7 @@ public async Task Rejection_should_not_commit_offsets()
8787

8888
var props = Props.FromFunc(ctx =>
8989
{
90-
GossipSender.Send(ctx, cluster, targetMember, delta, request, cluster.Config.GossipRequestTimeout, transport);
90+
GossipSender.Send(ctx, cluster.MemberList, targetMember, delta, request, cluster.Config.GossipRequestTimeout, transport);
9191
return Task.CompletedTask;
9292
});
9393

@@ -119,7 +119,7 @@ public async Task Timeout_should_not_commit_offsets()
119119

120120
var props = Props.FromFunc(ctx =>
121121
{
122-
GossipSender.Send(ctx, cluster, targetMember, delta, request, cluster.Config.GossipRequestTimeout, transport);
122+
GossipSender.Send(ctx, cluster.MemberList, targetMember, delta, request, cluster.Config.GossipRequestTimeout, transport);
123123
return Task.CompletedTask;
124124
});
125125

@@ -151,7 +151,7 @@ public async Task DeadLetter_should_not_commit_offsets()
151151

152152
var props = Props.FromFunc(ctx =>
153153
{
154-
GossipSender.Send(ctx, cluster, targetMember, delta, request, cluster.Config.GossipRequestTimeout, transport);
154+
GossipSender.Send(ctx, cluster.MemberList, targetMember, delta, request, cluster.Config.GossipRequestTimeout, transport);
155155
return Task.CompletedTask;
156156
});
157157

0 commit comments

Comments
 (0)