Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,10 @@ PartitionIdentityLookup.Send send

_output.WriteLine($"{totalCalls} requests, {restarts} restarts against " + actorStates.Count + " identities");

var activationRequests = actorStates.Sum(it => it.Events.Count(e => e is ActivationRequested));
var actorStarts = actorStates.Sum(it => it.Events.Count(e => e is ActorStarted));
_output.WriteLine($"Activation requests: {activationRequests}, actors started: {actorStarts}");

foreach (var actorState in actorStates)
{
if (actorState.Inconsistent)
Expand Down Expand Up @@ -298,12 +302,15 @@ protected override ClusterKind[] ClusterKinds
};

protected override IIdentityLookup GetIdentityLookup(string clusterName) =>
new PartitionIdentityLookup(new PartitionConfig
{
GetPidTimeout = TimeSpan.FromSeconds(5),
HandoverChunkSize = _chunkSize,
RebalanceRequestTimeout = TimeSpan.FromSeconds(3),
Mode = _mode,
Send = _send
});
new RecordingPartitionIdentityLookup(
new PartitionIdentityLookup(new PartitionConfig
{
GetPidTimeout = TimeSpan.FromSeconds(5),
HandoverChunkSize = _chunkSize,
RebalanceRequestTimeout = TimeSpan.FromSeconds(3),
Mode = _mode,
Send = _send
}),
Repository,
this);
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
// -----------------------------------------------------------------------
// <copyright file="RecordingPartitionIdentityLookup.cs" company="Asynkron AB">
// Copyright (C) 2015-2024 Asynkron AB All rights reserved
// </copyright>
// -----------------------------------------------------------------------

using System.Threading;
using System.Threading.Tasks;
using Proto;
using Proto.Cluster.Identity;
using Proto.Cluster.Partition;
using Proto.Cluster.Tests;
using Proto.Diagnostics;

namespace Proto.Cluster.PartitionIdentity.Tests;

/// <summary>
/// Identity lookup wrapper that records activation requests before delegating
/// to the underlying <see cref="PartitionIdentityLookup"/>.
/// </summary>
public class RecordingPartitionIdentityLookup : IIdentityLookup
{
private readonly PartitionIdentityLookup _inner;
private readonly ActorStateRepo _repo;
private readonly IClusterFixture _fixture;

public RecordingPartitionIdentityLookup(PartitionIdentityLookup inner, ActorStateRepo repo, IClusterFixture fixture)
{
_inner = inner;
_repo = repo;
_fixture = fixture;
}

public Task<PID?> GetAsync(ClusterIdentity clusterIdentity, CancellationToken ct)
{
_repo.Get(clusterIdentity.Identity, _fixture).RecordActivationRequest();
return _inner.GetAsync(clusterIdentity, ct);
}

public Task RemovePidAsync(ClusterIdentity clusterIdentity, PID pid, CancellationToken ct)
=> _inner.RemovePidAsync(clusterIdentity, pid, ct);

public Task SetupAsync(Cluster cluster, string[] kinds, bool isClient)
=> _inner.SetupAsync(cluster, kinds, isClient);

public Task ShutdownAsync() => _inner.ShutdownAsync();

public Task<DiagnosticsEntry[]> GetDiagnostics()
=> (_inner as IDiagnosticsProvider).GetDiagnostics();
}
6 changes: 6 additions & 0 deletions tests/Proto.Cluster.Tests/ConcurrencyVerificationActor.cs
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,9 @@ public record ActorStarted(string Member, PID Activation, DateTimeOffset When, i
public record ActorStopped(string Member, PID Activation, DateTimeOffset When, int StoredCount, long GlobalCount)
: VerificationEvent(Activation, When);

public record ActivationRequested(DateTimeOffset When)
: VerificationEvent(new PID(string.Empty, string.Empty), When);

public record ConsistencyError(
PID Activation,
DateTimeOffset When,
Expand Down Expand Up @@ -174,6 +177,9 @@ public void RecordStopping(IContext context)
}
}

public void RecordActivationRequest()
=> Events.Add(new ActivationRequested(DateTimeOffset.Now));

// do not verify consistency if any of the current members is blocked (which means they are shutting down)
// in this case we may see duplicated activation, but this is by design and we don't want to report it
// activation count should go back to expected value once the duplicated activation shuts down together with the member
Expand Down
Loading