Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions logs/log1755980804.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
## Fix flakiness in CompositeConsensusWorks
- Removed skip from the test to re-enable it.
- Eliminated arbitrary delays and used topology consensus to wait for cluster stability.
- Added polling loop after spawning a member to ensure topology hash updates before asserting.
- Confirmed reliability by running targeted test multiple times and entire test suites.
3 changes: 3 additions & 0 deletions logs/log1756009491.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
- Added ExpectUpdatedTopologyConsensus helper to Proto.TestKit to wait for topology changes without manual polling.
- Updated composite gossip consensus test to use new helper instead of inline loop, improving readability and reducing timing flakiness.
- Exposed Proto.Cluster internals to Proto.TestKit for access to topology consensus check.
5 changes: 5 additions & 0 deletions logs/log1756009684.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
## Summary
- Simplified gossip tests by directly disposing fixtures with `await using` instead of unused `cleanup` variables

## Motivation
- Unused `cleanup` variables were redundant and flagged in review; using `await using` on the fixture itself clarifies intent and removes dead code
3 changes: 3 additions & 0 deletions logs/log1756010588.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
# Change Log
- Added dedicated tests for `ExpectUpdatedTopologyConsensus` to verify completion on topology changes and timeout behavior
- Removed redundant topology hash assertion from `GossipTests.CompositeConsensusWorks` to rely solely on the helper
3 changes: 2 additions & 1 deletion src/Proto.Cluster/Properties/AssemblyInfo.cs
Original file line number Diff line number Diff line change
Expand Up @@ -7,4 +7,5 @@
using System.Runtime.CompilerServices;

[assembly: InternalsVisibleTo("Proto.Cluster.Tests")]
[assembly: InternalsVisibleTo("Proto.Premium")]
[assembly: InternalsVisibleTo("Proto.Premium")]
[assembly: InternalsVisibleTo("Proto.TestKit")]
38 changes: 38 additions & 0 deletions src/Proto.TestKit/ClusterTestKitExtensions.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
using System;
using System.Threading.Tasks;
using Proto;
using Proto.Cluster;

namespace Proto.TestKit;

/// <summary>
/// Extension methods for cluster-related test helpers.
/// </summary>
public static class ClusterTestKitExtensions
{
/// <summary>
/// Waits until the cluster's topology consensus hash changes.
/// </summary>
/// <param name="cluster">Cluster to monitor.</param>
/// <param name="timeout">Maximum time to wait for the update. Defaults to 20 seconds.</param>
/// <returns>The new topology hash once it differs from the initial hash.</returns>
public static async Task<ulong> ExpectUpdatedTopologyConsensus(this Proto.Cluster.Cluster cluster, TimeSpan? timeout = null)
{
var waitTimeout = timeout ?? TimeSpan.FromSeconds(20);
// Allow each consensus check up to five seconds to complete
var ct = CancellationTokens.FromSeconds(5);

// Capture the initial topology hash
(_, var initialTopologyHash) = await cluster.MemberList.TopologyConsensus(ct).ConfigureAwait(false);
ulong updatedTopologyHash = initialTopologyHash;

await TestKit.AwaitConditionAsync(async () =>
{
(_, updatedTopologyHash) = await cluster.MemberList.TopologyConsensus(ct).ConfigureAwait(false);
return updatedTopologyHash != initialTopologyHash;
}, waitTimeout, $"Topology hash did not change within {waitTimeout}");

return updatedTopologyHash;
}
}

43 changes: 43 additions & 0 deletions tests/Proto.Cluster.Tests/ExpectUpdatedTopologyConsensusTests.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
using System;
using System.Linq;
using System.Threading.Tasks;
using FluentAssertions;
using Proto.TestKit;
using Proto.Utils;
using Xunit;

namespace Proto.Cluster.Tests;

public class ExpectUpdatedTopologyConsensusTests
{
[Fact]
public async Task Completes_when_topology_changes()
{
await using var clusterFixture = new InMemoryClusterFixture();
await clusterFixture.InitializeAsync();

// Capture the initial topology hash
(_, var initialHash) = await clusterFixture.Members.First()
.MemberList.TopologyConsensus(CancellationTokens.FromSeconds(5));

var updateTask = clusterFixture.Members.First().ExpectUpdatedTopologyConsensus();

await clusterFixture.SpawnMember();

var newHash = await updateTask;

newHash.Should().NotBe(initialHash);
}

[Fact]
public async Task Throws_when_topology_does_not_change()
{
await using var clusterFixture = new InMemoryClusterFixture();
await clusterFixture.InitializeAsync();

var updateTask = clusterFixture.Members.First()
.ExpectUpdatedTopologyConsensus(TimeSpan.FromMilliseconds(200));

await Assert.ThrowsAsync<TimeoutException>(() => updateTask);
}
}
24 changes: 11 additions & 13 deletions tests/Proto.Cluster.Tests/GossipTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
using ClusterTest.Messages;
using FluentAssertions;
using Proto.Cluster.Gossip;
using Proto.TestKit;
using Xunit;
using Xunit.Abstractions;

Expand All @@ -34,8 +35,7 @@ public GossipTests(ITestOutputHelper testOutputHelper)
[Fact]
public async Task CanGetConsensus()
{
var clusterFixture = new InMemoryClusterFixture();
await using var _ = clusterFixture;
await using var clusterFixture = new InMemoryClusterFixture();
await clusterFixture.InitializeAsync();

const string initialValue = "hello consensus";
Expand All @@ -50,17 +50,14 @@ public async Task CanGetConsensus()

}

[Fact(Skip = "Flaky")]
[Fact]
public async Task CompositeConsensusWorks()
{
var timeout = CancellationTokens.FromSeconds(20);
var clusterFixture = new InMemoryClusterFixture();
await using var _ = clusterFixture;
await using var clusterFixture = new InMemoryClusterFixture();
await clusterFixture.InitializeAsync();

// Allow cluster to settle before verifying consensus
await Task.Delay(1000);

// Wait for the cluster to reach topology consensus before performing checks
var (consensus, initialTopologyHash) =
await clusterFixture.Members.First().MemberList.TopologyConsensus(timeout);

Expand All @@ -83,8 +80,11 @@ public async Task CompositeConsensusWorks()

afterSettingMatchingState.value.Should().Be(initialTopologyHash);

var updatedTopology = clusterFixture.Members.First().ExpectUpdatedTopologyConsensus();

await clusterFixture.SpawnMember();
await Task.Delay(2000); // Allow topology state to propagate

await updatedTopology;

var afterChangingTopology =
await firstNodeCheck.TryGetConsensus(TimeSpan.FromMilliseconds(500), timeout);
Expand All @@ -96,8 +96,7 @@ public async Task CompositeConsensusWorks()
[Fact]
public async Task CanFallOutOfConsensus()
{
var clusterFixture = new InMemoryClusterFixture();
await using var _ = clusterFixture;
await using var clusterFixture = new InMemoryClusterFixture();
await clusterFixture.InitializeAsync();

const string initialValue = "hello consensus";
Expand Down Expand Up @@ -146,8 +145,7 @@ public async Task Gossip_should_replicate_large_state_with_small_batches()
const int maxSend = 2;
const int keysPerMember = 5;

var clusterFixture = new GossipClusterFixture(memberCount, fanout, maxSend);
await using var _ = clusterFixture;
await using var clusterFixture = new GossipClusterFixture(memberCount, fanout, maxSend);
await clusterFixture.InitializeAsync();

var expected = clusterFixture.Members.ToDictionary(
Expand Down
Loading