Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 11 additions & 8 deletions src/Proto.Cluster/Partition/PartitionIdentityLookup.cs
Original file line number Diff line number Diff line change
Expand Up @@ -49,25 +49,28 @@ public enum Send
private static readonly ILogger Logger = Log.CreateLogger<PartitionIdentityLookup>();
private readonly PartitionConfig _config;
private readonly TimeSpan _getPidTimeout;
private readonly Func<Props, Props>? _configurePlacementProps;
private Cluster _cluster = null!;
private PartitionManager _partitionManager = null!;

public PartitionIdentityLookup(TimeSpan identityHandoverTimeout, TimeSpan getPidTimeout) : this(new PartitionConfig
{
GetPidTimeout = getPidTimeout,
RebalanceRequestTimeout = identityHandoverTimeout
})
public PartitionIdentityLookup(TimeSpan identityHandoverTimeout, TimeSpan getPidTimeout, Func<Props, Props>? configurePlacementProps = null)
: this(new PartitionConfig
{
GetPidTimeout = getPidTimeout,
RebalanceRequestTimeout = identityHandoverTimeout
}, configurePlacementProps)
{
}

public PartitionIdentityLookup() : this(new PartitionConfig())
public PartitionIdentityLookup(Func<Props, Props>? configurePlacementProps = null) : this(new PartitionConfig(), configurePlacementProps)
{
}

public PartitionIdentityLookup(PartitionConfig? config)
public PartitionIdentityLookup(PartitionConfig? config, Func<Props, Props>? configurePlacementProps = null)
{
_config = config ?? new PartitionConfig();
_getPidTimeout = _config.GetPidTimeout;
_configurePlacementProps = configurePlacementProps;
}

public async Task<PID?> GetAsync(ClusterIdentity clusterIdentity, CancellationToken notUsed)
Expand Down Expand Up @@ -189,7 +192,7 @@ public Task RemovePidAsync(ClusterIdentity clusterIdentity, PID pid, Cancellatio
public Task SetupAsync(Cluster cluster, string[] kinds, bool isClient)
{
_cluster = cluster;
_partitionManager = new PartitionManager(cluster, isClient, _config);
_partitionManager = new PartitionManager(cluster, isClient, _config, _configurePlacementProps);
_partitionManager.Setup();

return Task.CompletedTask;
Expand Down
11 changes: 10 additions & 1 deletion src/Proto.Cluster/Partition/PartitionManager.cs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
// </copyright>
// -----------------------------------------------------------------------

using System;
using System.Linq;
using System.Threading.Tasks;

Expand All @@ -19,16 +20,18 @@ internal class PartitionManager
private readonly IRootContext _context;
private readonly bool _isClient;
private readonly ActorSystem _system;
private readonly Func<Props, Props>? _configurePlacementProps;
private PID _partitionIdentityActor = null!;
private PID _partitionPlacementActor = null!;

internal PartitionManager(Cluster cluster, bool isClient, PartitionConfig config)
internal PartitionManager(Cluster cluster, bool isClient, PartitionConfig config, Func<Props, Props>? configurePlacementProps = null)
{
_cluster = cluster;
_system = cluster.System;
_context = _system.Root;
_isClient = isClient;
_config = config;
_configurePlacementProps = configurePlacementProps;
}

internal PartitionMemberSelector Selector { get; } = new();
Expand Down Expand Up @@ -60,6 +63,12 @@ public void Setup()
_partitionIdentityActor = _context.SpawnNamedSystem(partitionActorProps, PartitionIdentityActorName);

var partitionActivatorProps = Props.FromProducer(() => new PartitionPlacementActor(_cluster, _config));

if (_configurePlacementProps is not null)
{
partitionActivatorProps = _configurePlacementProps(partitionActivatorProps);
}

_partitionPlacementActor = _context.SpawnNamedSystem(partitionActivatorProps, PartitionPlacementActorName);

//synchronous subscribe to keep accurate
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,16 +21,18 @@ public class PartitionActivatorLookup : IIdentityLookup
{
private static readonly ILogger Logger = Log.CreateLogger<PartitionActivatorLookup>();
private readonly TimeSpan _getPidTimeout;
private readonly Func<Props, Props>? _configureProps;
private Cluster _cluster = null!;
private PartitionActivatorManager _partitionManager = null!;

public PartitionActivatorLookup() : this(TimeSpan.FromSeconds(1))
public PartitionActivatorLookup(Func<Props, Props>? configureProps = null) : this(TimeSpan.FromSeconds(1), configureProps)
{
}

public PartitionActivatorLookup(TimeSpan getPidTimeout)
public PartitionActivatorLookup(TimeSpan getPidTimeout, Func<Props, Props>? configureProps = null)
{
_getPidTimeout = getPidTimeout;
_configureProps = configureProps;
}

public async Task<PID?> GetAsync(ClusterIdentity clusterIdentity, CancellationToken notUsed)
Expand Down Expand Up @@ -116,7 +118,7 @@ public Task RemovePidAsync(ClusterIdentity clusterIdentity, PID pid, Cancellatio
public Task SetupAsync(Cluster cluster, string[] kinds, bool isClient)
{
_cluster = cluster;
_partitionManager = new PartitionActivatorManager(cluster, isClient);
_partitionManager = new PartitionActivatorManager(cluster, isClient, _configureProps);
_partitionManager.Setup();

return Task.CompletedTask;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
// </copyright>
// -----------------------------------------------------------------------

using System;
using System.Linq;
using System.Threading.Tasks;

Expand All @@ -18,14 +19,16 @@ public class PartitionActivatorManager
private readonly IRootContext _context;
private readonly bool _isClient;
private readonly ActorSystem _system;
private readonly Func<Props, Props>? _configureProps;
private PID _partitionActivatorActor = null!;

internal PartitionActivatorManager(Cluster cluster, bool isClient)
internal PartitionActivatorManager(Cluster cluster, bool isClient, Func<Props, Props>? configureProps = null)
{
_cluster = cluster;
_system = cluster.System;
_context = _system.Root;
_isClient = isClient;
_configureProps = configureProps;
}

internal PartitionActivatorSelector Selector { get; } = new();
Expand Down Expand Up @@ -54,6 +57,11 @@ public void Setup()
var partitionActivatorProps =
Props.FromProducer(() => new PartitionActivatorActor(_cluster, this));

if (_configureProps is not null)
{
partitionActivatorProps = _configureProps(partitionActivatorProps);
}

_partitionActivatorActor = _context.SpawnNamedSystem(partitionActivatorProps, PartitionActivatorActorName);

//synchronous subscribe to keep accurate
Expand Down
86 changes: 86 additions & 0 deletions tests/Proto.Cluster.Tests/PartitionMiddlewareTests.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,86 @@
using System;
using System.Threading;
using System.Threading.Tasks;
using ClusterTest.Messages;
using FluentAssertions;
using Proto.Cluster;
using Proto.Cluster.Identity;
using Proto.Cluster.Partition;
using Proto.Cluster.PartitionActivator;
using Xunit;

namespace Proto.Cluster.Tests;

public class PartitionActivatorMiddlewareTests : IClassFixture<PartitionActivatorMiddlewareTests.ActivatorMiddlewareFixture>
{
private readonly ActivatorMiddlewareFixture _fx;

public PartitionActivatorMiddlewareTests(ActivatorMiddlewareFixture fx) => _fx = fx;

[Fact]
public async Task ShouldInvokeActivatorMiddleware()
{
var cluster = _fx.Members[0];
var identity = ClusterIdentity.Create("act", EchoActor.Kind);
await cluster.RequestAsync<Pong>(identity, new Ping(), CancellationToken.None);

_fx.ActivationRequests.Should().BeGreaterThan(0);
}

public class ActivatorMiddlewareFixture : BaseInMemoryClusterFixture
{
public int ActivationRequests;

public ActivatorMiddlewareFixture() : base(1, config => config.WithActorRequestTimeout(TimeSpan.FromSeconds(4)))
{
}

protected override IIdentityLookup GetIdentityLookup(string clusterName) =>
new PartitionActivatorLookup(props => props.WithReceiverMiddleware(next => async (ctx, env) =>
{
if (env.Message is ActivationRequest)
{
Interlocked.Increment(ref ActivationRequests);
}

await next(ctx, env);
}));
}
}

public class PartitionPlacementMiddlewareTests : IClassFixture<PartitionPlacementMiddlewareTests.PlacementMiddlewareFixture>
{
private readonly PlacementMiddlewareFixture _fx;

public PartitionPlacementMiddlewareTests(PlacementMiddlewareFixture fx) => _fx = fx;

[Fact]
public async Task ShouldInvokePlacementMiddleware()
{
var cluster = _fx.Members[0];
var identity = ClusterIdentity.Create("place", EchoActor.Kind);
await cluster.RequestAsync<Pong>(identity, new Ping(), CancellationToken.None);

_fx.ActivationRequests.Should().BeGreaterThan(0);
}

public class PlacementMiddlewareFixture : BaseInMemoryClusterFixture
{
public int ActivationRequests;

public PlacementMiddlewareFixture() : base(1, config => config.WithActorRequestTimeout(TimeSpan.FromSeconds(4)))
{
}

protected override IIdentityLookup GetIdentityLookup(string clusterName) =>
new PartitionIdentityLookup(new PartitionConfig(), props => props.WithReceiverMiddleware(next => async (ctx, env) =>
{
if (env.Message is ActivationRequest)
{
Interlocked.Increment(ref ActivationRequests);
}

await next(ctx, env);
}));
}
}
Loading