Skip to content

Commit b3a646b

Browse files
Handle infinite looping when encountering a misconfigured load balancer (#121)
* Handle misconfigured load balancer * Add logic to handle misconfigured load balancer * Update LookupConnection to use maxAttempts as an argument Signed-off-by: Gabriele Santomaggio <[email protected]> Co-authored-by: Gabriele Santomaggio <[email protected]>
1 parent 82ab5ff commit b3a646b

File tree

2 files changed

+67
-9
lines changed

2 files changed

+67
-9
lines changed

RabbitMQ.Stream.Client/RoutingClient.cs

Lines changed: 32 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,6 @@ public interface IRouting
1919

2020
public class Routing : IRouting
2121
{
22-
2322
public bool ValidateDns { get; set; } = true;
2423

2524
public IClient CreateClient(ClientParameters clientParameters)
@@ -39,8 +38,11 @@ public IClient CreateClient(ClientParameters clientParameters)
3938
/// </summary>
4039
public static class RoutingHelper<T> where T : IRouting, new()
4140
{
42-
private static async Task<IClient> LookupConnection(ClientParameters clientParameters,
43-
Broker broker)
41+
private static async Task<IClient> LookupConnection(
42+
ClientParameters clientParameters,
43+
Broker broker,
44+
int maxAttempts
45+
)
4446
{
4547
var routing = new T();
4648

@@ -80,29 +82,49 @@ string GetPropertyValue(IDictionary<string, string> connectionProperties, string
8082
var advertisedHost = GetPropertyValue(client.ConnectionProperties, "advertised_host");
8183
var advertisedPort = GetPropertyValue(client.ConnectionProperties, "advertised_port");
8284

83-
while (broker.Host != advertisedHost ||
84-
broker.Port != uint.Parse(advertisedPort))
85+
var attemptNo = 0;
86+
while (broker.Host != advertisedHost || broker.Port != uint.Parse(advertisedPort))
8587
{
86-
await client.Close("advertised_host or advertised_port doesn't mach");
88+
attemptNo++;
89+
await client.Close("advertised_host or advertised_port doesn't match");
8790

8891
client = routing.CreateClient(clientParameters with { Endpoint = endPoint });
8992

9093
advertisedHost = GetPropertyValue(client.ConnectionProperties, "advertised_host");
9194
advertisedPort = GetPropertyValue(client.ConnectionProperties, "advertised_port");
92-
// TODO: Maybe an external parameter
95+
if (attemptNo > maxAttempts)
96+
{
97+
throw new RoutingClientException(
98+
$"Could not find broker ({broker.Host}:{broker.Port}) after {maxAttempts} attempts");
99+
}
100+
93101
Thread.Sleep(TimeSpan.FromMilliseconds(200));
94102
}
95103

96104
return client;
97105
}
98106

107+
private static int MaxAttempts(StreamInfo metaDataInfo)
108+
{
109+
// Here we have a reasonable number of retry.
110+
// based on the stream configuration
111+
// It will retry to the same node more than one time
112+
// to be sure that there is not some temp fail
113+
return (int)Math.Pow(2
114+
+
115+
1 // The leader node
116+
+
117+
metaDataInfo.Replicas.Count, // Replicas
118+
2); // Pow just to be sure that the LoadBalancer will ping all the nodes
119+
}
120+
99121
/// <summary>
100122
/// Gets the leader connection. The producer must connect to the leader.
101123
/// </summary>
102124
public static async Task<IClient> LookupLeaderConnection(ClientParameters clientParameters,
103125
StreamInfo metaDataInfo)
104126
{
105-
return await LookupConnection(clientParameters, metaDataInfo.Leader);
127+
return await LookupConnection(clientParameters, metaDataInfo.Leader, MaxAttempts(metaDataInfo));
106128
}
107129

108130
/// <summary>
@@ -121,7 +143,8 @@ public static async Task<IClient> LookupRandomConnection(ClientParameters client
121143
var rnd = new Random();
122144
var brokerId = rnd.Next(0, brokers.Count);
123145
var broker = brokers[brokerId];
124-
return await LookupConnection(clientParameters, broker);
146+
147+
return await LookupConnection(clientParameters, broker, MaxAttempts(metaDataInfo));
125148
}
126149
}
127150

Tests/UnitTests.cs

Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -59,6 +59,26 @@ public IClient CreateClient(ClientParameters clientParameters)
5959
public bool ValidateDns { get; set; } = false;
6060
}
6161

62+
public class MisconfiguredLoadBalancerRouting : IRouting
63+
{
64+
65+
public IClient CreateClient(ClientParameters clientParameters)
66+
{
67+
68+
var fake = new FakeClient(clientParameters)
69+
{
70+
ConnectionProperties = new Dictionary<string, string>()
71+
{
72+
["advertised_host"] = "node4",
73+
["advertised_port"] = "5552"
74+
}
75+
};
76+
return fake;
77+
}
78+
79+
public bool ValidateDns { get; set; } = false;
80+
}
81+
6282
//advertised_host is is missed
6383
public class MissingFieldsRouting : IRouting
6484
{
@@ -132,6 +152,21 @@ public void AddressResolverLoadBalancerSimulate()
132152
}
133153
}
134154

155+
[Fact]
156+
public void RoutingHelperShouldThrowIfLoadBalancerIsMisconfigured()
157+
{
158+
var addressResolver = new AddressResolver(new IPEndPoint(IPAddress.Parse("192.168.10.99"), 5552));
159+
var clientParameters = new ClientParameters()
160+
{
161+
AddressResolver = addressResolver,
162+
};
163+
var metaDataInfo = new StreamInfo("stream", ResponseCode.Ok, new Broker("node2", 5552),
164+
new List<Broker>() { new Broker("replica", 5552) });
165+
166+
Assert.ThrowsAsync<RoutingClientException>(
167+
() => RoutingHelper<MisconfiguredLoadBalancerRouting>.LookupLeaderConnection(clientParameters, metaDataInfo));
168+
}
169+
135170
[Fact]
136171
public void RandomReplicaLeader()
137172
{

0 commit comments

Comments
 (0)