Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
43 changes: 23 additions & 20 deletions src/Proto.Cluster/PubSub/TopicActor.cs
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ public Task ReceiveAsync(IContext context) =>
PubSubBatch batch => OnPubSubBatch(context, batch),
NotifyAboutFailingSubscribersRequest msg => OnNotifyAboutFailingSubscribers(context, msg),
ClusterTopology msg => OnClusterTopologyChanged(context, msg),
DeadLetterResponse msg => OnDeadLetterResponse(msg),
DeadLetterResponse msg => OnDeadLetterResponse(context, msg),
_ => Task.CompletedTask
};

Expand All @@ -55,7 +55,7 @@ private async Task OnStarted(IContext context)
_topic = context.Get<ClusterIdentity>()!.Identity;
_topologySubscription = context.System.EventStream.Subscribe<ClusterTopology>(context, context.Self);

var subs = await LoadSubscriptions(_topic).ConfigureAwait(false);
var subs = await LoadSubscriptions(_topic, context.CancellationToken).ConfigureAwait(false);

if (subs.Subscribers_ is not null)
{
Expand Down Expand Up @@ -169,7 +169,7 @@ private static Subscribers GetSubscribersForAddress(

private async Task OnNotifyAboutFailingSubscribers(IContext context, NotifyAboutFailingSubscribersRequest msg)
{
await UnsubscribeUnreachablePidSubscribers(msg.InvalidDeliveries).ConfigureAwait(false);
await UnsubscribeUnreachablePidSubscribers(msg.InvalidDeliveries, context.CancellationToken).ConfigureAwait(false);
LogDeliveryErrors(msg.InvalidDeliveries);

context.Respond(new NotifyAboutFailingSubscribersResponse());
Expand All @@ -188,7 +188,7 @@ private void LogDeliveryErrors(IReadOnlyCollection<SubscriberDeliveryReport> all
}

private async Task UnsubscribeUnreachablePidSubscribers(
IReadOnlyCollection<SubscriberDeliveryReport> allInvalidDeliveryReports)
IReadOnlyCollection<SubscriberDeliveryReport> allInvalidDeliveryReports, CancellationToken ct)
{
var allUnreachable = allInvalidDeliveryReports
.Where(r => r is
Expand All @@ -200,7 +200,7 @@ private async Task UnsubscribeUnreachablePidSubscribers(
.Select(s => s.Subscriber)
.ToList();

await RemoveSubscribers(allUnreachable).ConfigureAwait(false);
await RemoveSubscribers(allUnreachable, ct).ConfigureAwait(false);
}

private async Task OnClusterTopologyChanged(IContext context, ClusterTopology topology)
Expand All @@ -214,7 +214,7 @@ private async Task OnClusterTopologyChanged(IContext context, ClusterTopology to
addressesThatLeft.Contains(s.Pid.Address))
.ToList();

await RemoveSubscribers(subscribersThatLeft).ConfigureAwait(false);
await RemoveSubscribers(subscribersThatLeft, context.CancellationToken).ConfigureAwait(false);
}
}

Expand All @@ -227,10 +227,10 @@ private async Task UnsubscribeSubscribersOnMembersThatLeft(IContext ctx)
!activeMemberAddresses.Contains(s.Pid.Address))
.ToList();

await RemoveSubscribers(subscribersThatLeft).ConfigureAwait(false);
await RemoveSubscribers(subscribersThatLeft, ctx.CancellationToken).ConfigureAwait(false);
}

private async Task RemoveSubscribers(IReadOnlyCollection<SubscriberIdentity> subscribersThatLeft)
private async Task RemoveSubscribers(IReadOnlyCollection<SubscriberIdentity> subscribersThatLeft, CancellationToken ct)
{
if (subscribersThatLeft.Count > 0)
{
Expand All @@ -247,16 +247,15 @@ private async Task RemoveSubscribers(IReadOnlyCollection<SubscriberIdentity> sub
string.Join(", ", subscribersThatLeft));
}

await SaveSubscriptions(_topic, new Subscribers { Subscribers_ = { _subscribers } }).ConfigureAwait(false);
await SaveSubscriptions(_topic, new Subscribers { Subscribers_ = { _subscribers } }, ct).ConfigureAwait(false);
}
}

private async Task<Subscribers> LoadSubscriptions(string topic)
private async Task<Subscribers> LoadSubscriptions(string topic, CancellationToken ct)
{
try
{
//TODO: cancellation token config?
var state = await _subscriptionStore.GetAsync(topic, CancellationToken.None).ConfigureAwait(false);
var state = await _subscriptionStore.GetAsync(topic, ct).ConfigureAwait(false);
Logger.LogDebug("Topic {Topic} loaded subscriptions {Subscriptions}", _topic, state);

return state ?? new Subscribers();
Expand All @@ -272,13 +271,12 @@ private async Task<Subscribers> LoadSubscriptions(string topic)
}
}

private async Task SaveSubscriptions(string topic, Subscribers subs)
private async Task SaveSubscriptions(string topic, Subscribers subs, CancellationToken ct)
{
try
{
//TODO: cancellation token config?
Logger.LogDebug("Topic {Topic} saved subscriptions {Subscriptions}", _topic, subs);
await _subscriptionStore.SetAsync(topic, subs, CancellationToken.None).ConfigureAwait(false);
await _subscriptionStore.SetAsync(topic, subs, ct).ConfigureAwait(false);
}
catch (Exception e)
{
Expand All @@ -293,26 +291,31 @@ private async Task OnUnsubscribe(IContext context, UnsubscribeRequest unsub)
{
_subscribers = _subscribers.Remove(unsub.Subscriber);
Logger.LogDebug("Topic {Topic} - {Subscriber} unsubscribed", _topic, unsub);
await SaveSubscriptions(_topic, new Subscribers { Subscribers_ = { _subscribers } }).ConfigureAwait(false);
await SaveSubscriptions(_topic, new Subscribers { Subscribers_ = { _subscribers } }, context.CancellationToken)
.ConfigureAwait(false);
context.Respond(new UnsubscribeResponse());
}

private async Task OnSubscribe(IContext context, SubscribeRequest sub)
{
_subscribers = _subscribers.Add(sub.Subscriber);
Logger.LogDebug("Topic {Topic} - {Subscriber} subscribed", _topic, sub);
await SaveSubscriptions(_topic, new Subscribers { Subscribers_ = { _subscribers } }).ConfigureAwait(false);
await SaveSubscriptions(_topic, new Subscribers { Subscribers_ = { _subscribers } }, context.CancellationToken)
.ConfigureAwait(false);
context.Respond(new SubscribeResponse());
}

private async Task OnDeadLetterResponse(DeadLetterResponse msg)
private async Task OnDeadLetterResponse(IContext context, DeadLetterResponse msg)
{
var deadLetterSub = msg.Target == null ? null : _subscribers.FirstOrDefault(s => s.Pid.Address == msg.Target.Address);
var deadLetterSub = msg.Target == null
? null
: _subscribers.FirstOrDefault(s => s.Pid.Address == msg.Target.Address);
if (deadLetterSub != null)
{
_subscribers = _subscribers.Remove(deadLetterSub);
Logger.LogDebug("Topic {Topic} - {Subscriber} unsubscribed due to dead letter", _topic, deadLetterSub);
await SaveSubscriptions(_topic, new Subscribers { Subscribers_ = { _subscribers } }).ConfigureAwait(false);
await SaveSubscriptions(_topic, new Subscribers { Subscribers_ = { _subscribers } }, context.CancellationToken)
.ConfigureAwait(false);
}
}
}
Loading