diff --git a/src/Proto.Cluster/PubSub/TopicActor.cs b/src/Proto.Cluster/PubSub/TopicActor.cs index 0b1fe09a52..62f736fac0 100644 --- a/src/Proto.Cluster/PubSub/TopicActor.cs +++ b/src/Proto.Cluster/PubSub/TopicActor.cs @@ -46,7 +46,7 @@ public Task ReceiveAsync(IContext context) => PubSubBatch batch => OnPubSubBatch(context, batch), NotifyAboutFailingSubscribersRequest msg => OnNotifyAboutFailingSubscribers(context, msg), ClusterTopology msg => OnClusterTopologyChanged(context, msg), - DeadLetterResponse msg => OnDeadLetterResponse(msg), + DeadLetterResponse msg => OnDeadLetterResponse(context, msg), _ => Task.CompletedTask }; @@ -55,7 +55,7 @@ private async Task OnStarted(IContext context) _topic = context.Get()!.Identity; _topologySubscription = context.System.EventStream.Subscribe(context, context.Self); - var subs = await LoadSubscriptions(_topic).ConfigureAwait(false); + var subs = await LoadSubscriptions(_topic, context.CancellationToken).ConfigureAwait(false); if (subs.Subscribers_ is not null) { @@ -169,7 +169,7 @@ private static Subscribers GetSubscribersForAddress( private async Task OnNotifyAboutFailingSubscribers(IContext context, NotifyAboutFailingSubscribersRequest msg) { - await UnsubscribeUnreachablePidSubscribers(msg.InvalidDeliveries).ConfigureAwait(false); + await UnsubscribeUnreachablePidSubscribers(msg.InvalidDeliveries, context.CancellationToken).ConfigureAwait(false); LogDeliveryErrors(msg.InvalidDeliveries); context.Respond(new NotifyAboutFailingSubscribersResponse()); @@ -188,7 +188,7 @@ private void LogDeliveryErrors(IReadOnlyCollection all } private async Task UnsubscribeUnreachablePidSubscribers( - IReadOnlyCollection allInvalidDeliveryReports) + IReadOnlyCollection allInvalidDeliveryReports, CancellationToken ct) { var allUnreachable = allInvalidDeliveryReports .Where(r => r is @@ -200,7 +200,7 @@ private async Task UnsubscribeUnreachablePidSubscribers( .Select(s => s.Subscriber) .ToList(); - await RemoveSubscribers(allUnreachable).ConfigureAwait(false); + await RemoveSubscribers(allUnreachable, ct).ConfigureAwait(false); } private async Task OnClusterTopologyChanged(IContext context, ClusterTopology topology) @@ -214,7 +214,7 @@ private async Task OnClusterTopologyChanged(IContext context, ClusterTopology to addressesThatLeft.Contains(s.Pid.Address)) .ToList(); - await RemoveSubscribers(subscribersThatLeft).ConfigureAwait(false); + await RemoveSubscribers(subscribersThatLeft, context.CancellationToken).ConfigureAwait(false); } } @@ -227,10 +227,10 @@ private async Task UnsubscribeSubscribersOnMembersThatLeft(IContext ctx) !activeMemberAddresses.Contains(s.Pid.Address)) .ToList(); - await RemoveSubscribers(subscribersThatLeft).ConfigureAwait(false); + await RemoveSubscribers(subscribersThatLeft, ctx.CancellationToken).ConfigureAwait(false); } - private async Task RemoveSubscribers(IReadOnlyCollection subscribersThatLeft) + private async Task RemoveSubscribers(IReadOnlyCollection subscribersThatLeft, CancellationToken ct) { if (subscribersThatLeft.Count > 0) { @@ -247,16 +247,15 @@ private async Task RemoveSubscribers(IReadOnlyCollection sub string.Join(", ", subscribersThatLeft)); } - await SaveSubscriptions(_topic, new Subscribers { Subscribers_ = { _subscribers } }).ConfigureAwait(false); + await SaveSubscriptions(_topic, new Subscribers { Subscribers_ = { _subscribers } }, ct).ConfigureAwait(false); } } - private async Task LoadSubscriptions(string topic) + private async Task LoadSubscriptions(string topic, CancellationToken ct) { try { - //TODO: cancellation token config? - var state = await _subscriptionStore.GetAsync(topic, CancellationToken.None).ConfigureAwait(false); + var state = await _subscriptionStore.GetAsync(topic, ct).ConfigureAwait(false); Logger.LogDebug("Topic {Topic} loaded subscriptions {Subscriptions}", _topic, state); return state ?? new Subscribers(); @@ -272,13 +271,12 @@ private async Task LoadSubscriptions(string topic) } } - private async Task SaveSubscriptions(string topic, Subscribers subs) + private async Task SaveSubscriptions(string topic, Subscribers subs, CancellationToken ct) { try { - //TODO: cancellation token config? Logger.LogDebug("Topic {Topic} saved subscriptions {Subscriptions}", _topic, subs); - await _subscriptionStore.SetAsync(topic, subs, CancellationToken.None).ConfigureAwait(false); + await _subscriptionStore.SetAsync(topic, subs, ct).ConfigureAwait(false); } catch (Exception e) { @@ -293,7 +291,8 @@ private async Task OnUnsubscribe(IContext context, UnsubscribeRequest unsub) { _subscribers = _subscribers.Remove(unsub.Subscriber); Logger.LogDebug("Topic {Topic} - {Subscriber} unsubscribed", _topic, unsub); - await SaveSubscriptions(_topic, new Subscribers { Subscribers_ = { _subscribers } }).ConfigureAwait(false); + await SaveSubscriptions(_topic, new Subscribers { Subscribers_ = { _subscribers } }, context.CancellationToken) + .ConfigureAwait(false); context.Respond(new UnsubscribeResponse()); } @@ -301,18 +300,22 @@ private async Task OnSubscribe(IContext context, SubscribeRequest sub) { _subscribers = _subscribers.Add(sub.Subscriber); Logger.LogDebug("Topic {Topic} - {Subscriber} subscribed", _topic, sub); - await SaveSubscriptions(_topic, new Subscribers { Subscribers_ = { _subscribers } }).ConfigureAwait(false); + await SaveSubscriptions(_topic, new Subscribers { Subscribers_ = { _subscribers } }, context.CancellationToken) + .ConfigureAwait(false); context.Respond(new SubscribeResponse()); } - private async Task OnDeadLetterResponse(DeadLetterResponse msg) + private async Task OnDeadLetterResponse(IContext context, DeadLetterResponse msg) { - var deadLetterSub = msg.Target == null ? null : _subscribers.FirstOrDefault(s => s.Pid.Address == msg.Target.Address); + var deadLetterSub = msg.Target == null + ? null + : _subscribers.FirstOrDefault(s => s.Pid.Address == msg.Target.Address); if (deadLetterSub != null) { _subscribers = _subscribers.Remove(deadLetterSub); Logger.LogDebug("Topic {Topic} - {Subscriber} unsubscribed due to dead letter", _topic, deadLetterSub); - await SaveSubscriptions(_topic, new Subscribers { Subscribers_ = { _subscribers } }).ConfigureAwait(false); + await SaveSubscriptions(_topic, new Subscribers { Subscribers_ = { _subscribers } }, context.CancellationToken) + .ConfigureAwait(false); } } }