Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 16 additions & 1 deletion src/Proto.Cluster/Identity/IdentityMetrics.cs
Original file line number Diff line number Diff line change
Expand Up @@ -24,4 +24,19 @@ public static class IdentityMetrics
"protocluster_identity_try_acquire_lock_duration", "seconds",
"Time spent trying to acquire the global lock for cluster kind from identity storage"
);
}

public static readonly Counter<long> ActivationRequestSentCount = ProtoMetrics.Meter.CreateCounter<long>(
"protocluster_identity_activation_request_sent_count",
description: "Number of activation requests sent by identity lookup providers"
);

public static readonly Counter<long> ActivationRequestReceivedCount = ProtoMetrics.Meter.CreateCounter<long>(
"protocluster_activator_activation_request_received_count",
description: "Number of activation requests received by activation actors"
);

public static readonly Counter<long> ActivationRequestForwardedCount = ProtoMetrics.Meter.CreateCounter<long>(
"protocluster_activator_activation_request_forwarded_count",
description: "Number of activation requests forwarded by activation actors"
);
}
8 changes: 8 additions & 0 deletions src/Proto.Cluster/Identity/IdentityStoragePlacementActor.cs
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,14 @@ private async Task OnActivationTerminating(IContext context, ActivationTerminati

private async Task OnActivationRequest(IContext context, ActivationRequest msg)
{
if (context.System.Metrics.Enabled)
{
IdentityMetrics.ActivationRequestReceivedCount.Add(1,
new KeyValuePair<string, object?>("id", context.System.Id),
new KeyValuePair<string, object?>("address", context.System.Address),
new KeyValuePair<string, object?>("clusterkind", msg.Kind));
}

if (_actors.TryGetValue(msg.ClusterIdentity, out var existing))
{
//this identity already exists
Expand Down
8 changes: 8 additions & 0 deletions src/Proto.Cluster/Identity/IdentityStorageWorker.cs
Original file line number Diff line number Diff line change
Expand Up @@ -282,6 +282,14 @@ async Task<PidResult> Inner()
RequestId = spawnLock.LockId
};

if (_cluster.System.Metrics.Enabled)
{
IdentityMetrics.ActivationRequestSentCount.Add(1,
new KeyValuePair<string, object?>("id", _cluster.System.Id),
new KeyValuePair<string, object?>("address", _cluster.System.Address),
new KeyValuePair<string, object?>("clusterkind", spawnLock.ClusterIdentity.Kind));
}

try
{
var resp = await _cluster.System.Root.RequestAsync<ActivationResponse>(remotePid, req, ct).ConfigureAwait(false);
Expand Down
9 changes: 9 additions & 0 deletions src/Proto.Cluster/Partition/PartitionIdentityActor.cs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
using System.Threading;
using System.Threading.Tasks;
using Microsoft.Extensions.Logging;
using Proto.Cluster.Identity;

namespace Proto.Cluster.Partition;

Expand Down Expand Up @@ -751,6 +752,14 @@ private async Task<ActivationResponse> SpawnRemoteActor(IContext context, Activa
var timeout = _cluster.Config.ActorActivationTimeout;
var activatorPid = PartitionManager.RemotePartitionPlacementActor(activatorAddress);

if (context.System.Metrics.Enabled)
{
IdentityMetrics.ActivationRequestSentCount.Add(1,
new KeyValuePair<string, object?>("id", context.System.Id),
new KeyValuePair<string, object?>("address", context.System.Address),
new KeyValuePair<string, object?>("clusterkind", req.Kind));
}

var res = await context.RequestAsync<ActivationResponse>(activatorPid, req, timeout).ConfigureAwait(false);

return res;
Expand Down
1 change: 1 addition & 0 deletions src/Proto.Cluster/Partition/PartitionIdentityLookup.cs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
// -----------------------------------------------------------------------

using System;
using System.Collections.Generic;
using System.Threading;
using System.Threading.Tasks;
using Microsoft.Extensions.Logging;
Expand Down
9 changes: 9 additions & 0 deletions src/Proto.Cluster/Partition/PartitionPlacementActor.cs
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
using Microsoft.Extensions.Logging;
using Proto.Logging;
using Proto.Utils;
using Proto.Cluster.Identity;

namespace Proto.Cluster.Partition;

Expand Down Expand Up @@ -336,6 +337,14 @@ private Props AbortOnDeadLetter(CancellationTokenSource cts) =>

private async Task OnActivationRequest(IContext context, ActivationRequest msg)
{
if (context.System.Metrics.Enabled)
{
IdentityMetrics.ActivationRequestReceivedCount.Add(1,
new KeyValuePair<string, object?>("id", context.System.Id),
new KeyValuePair<string, object?>("address", context.System.Address),
new KeyValuePair<string, object?>("clusterkind", msg.Kind));
}

if (_actors.TryGetValue(msg.ClusterIdentity, out var existing))
{
if (Logger.IsEnabled(LogLevel.Debug))
Expand Down
17 changes: 17 additions & 0 deletions src/Proto.Cluster/PartitionActivator/PartitionActivatorActor.cs
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
using System.Threading.Tasks;
using Microsoft.Extensions.Logging;
using Proto.Utils;
using Proto.Cluster.Identity;

namespace Proto.Cluster.PartitionActivator;

Expand Down Expand Up @@ -165,11 +166,27 @@ private async Task OnActivationRequest(ActivationRequest msg, IContext context)
Logger.LogWarning("[PartitionActivator] Tried to spawn on wrong node, forwarding");
}

if (context.System.Metrics.Enabled)
{
IdentityMetrics.ActivationRequestForwardedCount.Add(1,
new KeyValuePair<string, object?>("id", context.System.Id),
new KeyValuePair<string, object?>("address", context.System.Address),
new KeyValuePair<string, object?>("clusterkind", msg.Kind));
}

context.Forward(ownerPid);

return;
}

if (context.System.Metrics.Enabled)
{
IdentityMetrics.ActivationRequestReceivedCount.Add(1,
new KeyValuePair<string, object?>("id", context.System.Id),
new KeyValuePair<string, object?>("address", context.System.Address),
new KeyValuePair<string, object?>("clusterkind", msg.Kind));
}

if (_actors.TryGetValue(msg.ClusterIdentity, out var existing))
{
context.Respond(new ActivationResponse
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
// -----------------------------------------------------------------------

using System;
using System.Collections.Generic;
using System.Threading;
using System.Threading.Tasks;
using Microsoft.Extensions.Logging;
Expand Down Expand Up @@ -58,6 +59,14 @@ public PartitionActivatorLookup(TimeSpan getPidTimeout, Func<Props, Props>? conf
ClusterIdentity = clusterIdentity
};

if (_cluster.System.Metrics.Enabled)
{
IdentityMetrics.ActivationRequestSentCount.Add(1,
new KeyValuePair<string, object?>("id", _cluster.System.Id),
new KeyValuePair<string, object?>("address", _cluster.System.Address),
new KeyValuePair<string, object?>("clusterkind", clusterIdentity.Kind));
}

if (Logger.IsEnabled(LogLevel.Debug))
{
Logger.LogDebug("[PartitionActivator] Requesting remote PID from {Partition}:{Remote} {@Request}", owner,
Expand Down
9 changes: 9 additions & 0 deletions src/Proto.Cluster/SingleNode/SingleNodeActivatorActor.cs
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
using System.Linq;
using System.Threading.Tasks;
using Microsoft.Extensions.Logging;
using Proto.Cluster.Identity;

namespace Proto.Cluster.SingleNode;

Expand Down Expand Up @@ -116,6 +117,14 @@ private Task OnActivationTerminating(ActivationTerminating msg)

private async Task OnActivationRequest(ActivationRequest msg, IContext context)
{
if (context.System.Metrics.Enabled)
{
IdentityMetrics.ActivationRequestReceivedCount.Add(1,
new KeyValuePair<string, object?>("id", context.System.Id),
new KeyValuePair<string, object?>("address", context.System.Address),
new KeyValuePair<string, object?>("clusterkind", msg.Kind));
}

if (_actors.TryGetValue(msg.ClusterIdentity, out var existing))
{
context.Respond(new ActivationResponse
Expand Down
9 changes: 9 additions & 0 deletions src/Proto.Cluster/SingleNode/SingleNodeLookup.cs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
// -----------------------------------------------------------------------

using System;
using System.Collections.Generic;
using System.Threading;
using System.Threading.Tasks;
using Microsoft.Extensions.Logging;
Expand Down Expand Up @@ -43,6 +44,14 @@ public SingleNodeLookup(TimeSpan getPidTimeout)
ClusterIdentity = clusterIdentity
};

if (_cluster.System.Metrics.Enabled)
{
IdentityMetrics.ActivationRequestSentCount.Add(1,
new KeyValuePair<string, object?>("id", _cluster.System.Id),
new KeyValuePair<string, object?>("address", _cluster.System.Address),
new KeyValuePair<string, object?>("clusterkind", clusterIdentity.Kind));
}

try
{
var resp = await _cluster.System.Root.RequestAsync<ActivationResponse>(_activatorActor, req, cts.Token).ConfigureAwait(false);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
using System;
using System.Collections.Generic;
using System.Diagnostics;
using System.Diagnostics.Metrics;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
Expand All @@ -16,6 +17,8 @@
using Proto.Cluster.Identity;
using Proto.Cluster.Partition;
using Proto.Cluster.Tests;
using Proto;
using Proto.Metrics;
using Xunit;
using Xunit.Abstractions;

Expand Down Expand Up @@ -48,6 +51,35 @@ PartitionIdentityLookup.Send send
{
const int memberCount = 3;

var activationRequestsSent = 0L;
var activationRequestsReceived = 0L;
using var listener = new MeterListener
{
InstrumentPublished = (instrument, l) =>
{
if (instrument.Meter.Name == ProtoMetrics.MeterName &&
(instrument.Name == "protocluster_identity_activation_request_sent_count" ||
instrument.Name == "protocluster_activator_activation_request_received_count"))
{
l.EnableMeasurementEvents(instrument);
}
}
};

listener.SetMeasurementEventCallback<long>((instrument, measurement, _, _) =>
{
if (instrument.Name == "protocluster_identity_activation_request_sent_count")
{
Interlocked.Add(ref activationRequestsSent, measurement);
}
else if (instrument.Name == "protocluster_activator_activation_request_received_count")
{
Interlocked.Add(ref activationRequestsReceived, measurement);
}
});

listener.Start();

Interlocked.Exchange(ref _requests, 0);
var fixture = await InitClusterFixture(memberCount, mode, send);
await using var __ = fixture;
Expand Down Expand Up @@ -93,19 +125,25 @@ PartitionIdentityLookup.Send send

var totalCalls = actorStates.Select(it => it.TotalCount).Sum();
var restarts = actorStates.Select(it => it.Events.Count(e => e is ActorStopped) - 1).Sum();
var totalActivationRequests =
actorStates.Select(it => it.Events.Count(e => e is ActivationRequested)).Sum();
var totalStarts = actorStates.Select(it => it.Events.Count(e => e is ActorStarted)).Sum();

var sentActivationRequests = activationRequestsSent;
var receivedActivationRequests = activationRequestsReceived;

_output.WriteLine(
$"{totalCalls} requests, {restarts} restarts, {totalActivationRequests} activation requests against " +
$"{totalCalls} requests, {restarts} restarts, {receivedActivationRequests} activation requests against " +
actorStates.Count + " identities");
_output.WriteLine($"{sentActivationRequests} activation requests sent by identity lookups");

// Ensure every activation request sent by lookups was handled by an activator
sentActivationRequests.Should().Be(receivedActivationRequests);

// Some activation requests may target actors that are already running
// so the number of received requests can exceed actual actor starts
receivedActivationRequests.Should().BeGreaterOrEqualTo(totalStarts);

foreach (var actorState in actorStates)
{
var activationReqs = actorState.Events.Count(e => e is ActivationRequested);
var starts = actorState.Events.Count(e => e is ActorStarted);
activationReqs.Should().Be(starts);

if (actorState.Inconsistent)
{
Assert.False(actorState.Inconsistent, actorState.ToString());
Expand Down Expand Up @@ -299,6 +337,9 @@ public PartitionIdentityClusterFixture(
_chunkSize = chunkSize;
}

protected override ActorSystemConfig GetActorSystemConfig() =>
base.GetActorSystemConfig().WithMetrics();

protected override ClusterKind[] ClusterKinds
=> new[]
{
Expand All @@ -315,14 +356,5 @@ protected override IIdentityLookup GetIdentityLookup(string clusterName) =>
RebalanceRequestTimeout = TimeSpan.FromSeconds(3),
Mode = _mode,
Send = _send
},
props => props.WithReceiverMiddleware(next => async (ctx, env) =>
{
if (env.Message is ActivationRequest req)
{
Repository.Get(req.Identity, this).RecordActivationRequest(ctx.System.Id);
}

await next(ctx, env);
}));
});
}
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
<ItemGroup>
<ProjectReference Include="..\Proto.Cluster.Tests\Proto.Cluster.Tests.csproj" />
<ProjectReference Include="..\Proto.TestFixtures\Proto.TestFixtures.csproj" />
<ProjectReference Include="..\..\src\Proto.Actor\Proto.Actor.csproj" />
</ItemGroup>

</Project>
Loading