diff --git a/Fritz.Chatbot/Commands/TeamCommand.cs b/Fritz.Chatbot/Commands/TeamCommand.cs index 4542b172..73c99d7d 100644 --- a/Fritz.Chatbot/Commands/TeamCommand.cs +++ b/Fritz.Chatbot/Commands/TeamCommand.cs @@ -1,5 +1,6 @@ using Fritz.StreamLib.Core; using Fritz.StreamTools.Hubs; +using Fritz.Chatbot.Helpers; using Microsoft.AspNetCore.SignalR; using Microsoft.Extensions.Configuration; using Microsoft.Extensions.Logging; @@ -11,6 +12,8 @@ using System.Net.Http; using System.Text; using System.Threading.Tasks; +using System.Reactive.Linq; +using System.Reactive.Subjects; namespace Fritz.Chatbot.Commands { @@ -23,6 +26,11 @@ public class TeamCommand : IExtendedCommand private HttpClient _HttpClient; private readonly IHubContext _Context; private ILogger _Logger; + private Subject _TeammateNotifications = new Subject(); + /// + /// Handle to the teammates notification subscription. Dispose to stop the subscription + /// + private IDisposable _subscriptionHandle; public string Name { get; } = "Team Detection"; public string Description { get; } = "Alert when a teammate joins the stream and starts chatting"; @@ -31,8 +39,7 @@ public class TeamCommand : IExtendedCommand public TimeSpan? Cooldown { get; } = TimeSpan.FromSeconds(5); public TimeSpan ShoutoutCooldown; public string ShoutoutFormat; - public Queue _TeammateNotifications = new Queue(); - + public TeamCommand(IConfiguration configuration, ILoggerFactory loggerFactory, IHubContext context, IHttpClientFactory httpClientFactory) { _TeamName = configuration["StreamServices:Twitch:Team"]; @@ -56,24 +63,14 @@ public TeamCommand(IConfiguration configuration, ILoggerFactory loggerFactory, I GetTeammates().GetAwaiter().GetResult(); - Task.Run(SendNotificationsToWidget); - + SubscribeToTeamNotifications(); } - private void SendNotificationsToWidget() - { - - while (true) { - - if (_TeammateNotifications.TryPeek(out var _)) { - - _Context.Clients.All.SendAsync("Teammate", _TeammateNotifications.Dequeue()); - Task.Delay(5000); - - } - - } - + private void SubscribeToTeamNotifications() { + //when notifications appear, throttle them to 1 item per 5 seconds + _subscriptionHandle = _TeammateNotifications.Throttle(1, TimeSpan.FromSeconds(5)).Subscribe(notification => { + _Context.Clients.All.SendAsync("Teammate", notification); + }); } public bool CanExecute(string userName, string fullCommandText) @@ -95,7 +92,7 @@ public async Task Execute(IChatService chatService, string userName, string full await chatService.SendMessageAsync(ShoutoutFormat.Replace("{teammate}", userName)); } - _TeammateNotifications.Enqueue(userName); + _TeammateNotifications.OnNext(userName); } diff --git a/Fritz.Chatbot/Fritz.Chatbot.csproj b/Fritz.Chatbot/Fritz.Chatbot.csproj index acadd68c..10cf5364 100644 --- a/Fritz.Chatbot/Fritz.Chatbot.csproj +++ b/Fritz.Chatbot/Fritz.Chatbot.csproj @@ -22,6 +22,7 @@ + diff --git a/Fritz.Chatbot/Helpers/ObservableExtensions.cs b/Fritz.Chatbot/Helpers/ObservableExtensions.cs new file mode 100644 index 00000000..3ec2db6a --- /dev/null +++ b/Fritz.Chatbot/Helpers/ObservableExtensions.cs @@ -0,0 +1,97 @@ +using System; +using System.Reactive.Linq; +using System.Reactive.Subjects; +using System.Collections.Concurrent; +using System.Threading; + +namespace Fritz.Chatbot.Helpers +{ + public static class ObservableExtensions + { + /// + /// Pass through up to items downstream within given . + /// Once more elements are about to get through they will become buffered, until interval resets. + /// + public static IObservable Throttle(this IObservable source, int count, TimeSpan interval) => + new Throttle(source, count, interval); + } + + /// + /// Custom Throttle implementation, because the default provided with rx.net does not work like we want. + /// code is by @Horusiath Who confirmed that i was indeed not crazy, and that rx does not has an operator OOB + /// that behaves like this. + /// + /// + public class Throttle : IObservable + { + private readonly IObservable _source; + private readonly int _count; + private readonly TimeSpan _interval; + + public Throttle(IObservable source, int count, TimeSpan interval) + { + _source = source; + _count = count; + _interval = interval; + } + + public IDisposable Subscribe(IObserver observer) => + _source.SubscribeSafe(new Observer(observer, _count, _interval)); + + private sealed class Observer : IObserver + { + private readonly IObserver _observer; + private readonly int _count; + private readonly Timer _timer; + private readonly ConcurrentQueue _buffer; + private int _remaining; + + public Observer(IObserver observer, int count, TimeSpan interval) + { + _observer = observer; + _remaining = _count = count; + _buffer = new ConcurrentQueue(); + _timer = new Timer(_ => + { + // first, try to dequeue up to `_count` buffered items + // after that is done, reset `_remaining` quota to what's left + var i = _count; + while (i > 0 && _buffer.TryDequeue(out var value)) + { + i--; + _observer.OnNext(value); + } + + // reset remaining count at the end of the interval + Interlocked.Exchange(ref _remaining, i); + }, null, interval, interval); + } + + public void OnCompleted() + { + // what to do with buffered items? Up to you. + _timer.Dispose(); + _observer.OnCompleted(); + } + + public void OnError(Exception error) + { + _observer.OnError(error); + } + + public void OnNext(T value) + { + if (Interlocked.Decrement(ref _remaining) >= 0) + { + // if we have free quota to spare in this interval, emit value downstream + _observer.OnNext(value); + } + else + { + // otherwise buffer value until timer will reset it + _buffer.Enqueue(value); + } + } + } + } +} \ No newline at end of file