Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
79 changes: 23 additions & 56 deletions src/BootstrapBlazor/Services/ThrottleDispatcher.cs
Original file line number Diff line number Diff line change
Expand Up @@ -10,16 +10,13 @@ namespace BootstrapBlazor.Components;
/// </summary>
public class ThrottleDispatcher(ThrottleOptions options)
{
private readonly object _locker = new();
private Task? _lastTask;
private DateTime? _invokeTime;
private bool _busy;

/// <summary>
/// 判断是否等待方法
/// </summary>
/// <returns></returns>
protected virtual bool ShouldWait() => _busy || _invokeTime.HasValue && (DateTime.UtcNow - _invokeTime.Value) < options.Interval;
protected virtual bool ShouldWait() => _invokeTime.HasValue && (DateTime.UtcNow - _invokeTime.Value) < options.Interval;

/// <summary>
/// 异步限流方法
Expand All @@ -32,83 +29,53 @@ public class ThrottleDispatcher(ThrottleOptions options)
/// 同步限流方法
/// </summary>
/// <param name="action">同步回调方法</param>
/// <param name="cancellationToken">取消令牌</param>
public void Throttle(Action action, CancellationToken cancellationToken = default)
/// <param name="token">取消令牌</param>
public void Throttle(Action action, CancellationToken token = default)
{
var task = InternalThrottleAsync(() => Task.Run(() =>
{
action();
return Task.CompletedTask;
}, cancellationToken), cancellationToken);
Wait();
return;

[ExcludeFromCodeCoverage]
void Wait()
}, CancellationToken.None), token);
try
{
try
{
task.Wait(cancellationToken);
}
catch (AggregateException ex)
{
if (ex.InnerException is not null)
{
throw ex.InnerException;
}
}
catch (Exception)
{
throw;
}
task.Wait(token);
}
catch (Exception)
{
throw;
}
return;
}

/// <summary>
/// 任务实例
/// </summary>
protected Task LastTask => _lastTask ?? Task.CompletedTask;

/// <summary>
/// 限流异步方法
/// </summary>
/// <param name="function">异步回调方法</param>
/// <param name="cancellationToken">取消令牌</param>
private Task InternalThrottleAsync(Func<Task> function, CancellationToken cancellationToken = default)
private async Task InternalThrottleAsync(Func<Task> function, CancellationToken cancellationToken = default)
{
if (ShouldWait())
{
return LastTask;
return;
}

lock (_locker)
_invokeTime = DateTime.UtcNow;

try
{
if (ShouldWait())
await function();
if (options.DelayAfterExecution)
{
return LastTask;
_invokeTime = DateTime.UtcNow;
}

_busy = true;
_invokeTime = DateTime.UtcNow;
_lastTask = function();
_lastTask.ContinueWith(_ =>
{
if (options.DelayAfterExecution)
{
_invokeTime = DateTime.UtcNow;
}
_busy = false;
}, cancellationToken);

}
catch
{
if (options.ResetIntervalOnException)
{
_lastTask.ContinueWith((_, _) =>
{
_lastTask = null;
_invokeTime = null;
}, cancellationToken, TaskContinuationOptions.OnlyOnFaulted);
_invokeTime = null;
}
return LastTask;
}
}
}
70 changes: 38 additions & 32 deletions test/UnitTest/Services/ThrottleTest.cs
Original file line number Diff line number Diff line change
Expand Up @@ -81,19 +81,21 @@ public async Task ResetIntervalOnException_Ok()
var dispatcher = factory.GetOrCreate("Error", new ThrottleOptions() { ResetIntervalOnException = true });

var count = 0;
await Assert.ThrowsAnyAsync<InvalidOperationException>(() => dispatcher.ThrottleAsync(() =>
await dispatcher.ThrottleAsync(() =>
{
count++;
throw new InvalidOperationException();
}));
throw new Exception();
});
Assert.Equal(1, count);

Assert.ThrowsAny<InvalidOperationException>(() => dispatcher.Throttle(() => throw new InvalidOperationException()));
dispatcher.Throttle(() => throw new InvalidOperationException());

// 发生错误后可以立即执行下一次任务,不限流
dispatcher.Throttle(() =>
{
count++;
});
Assert.Equal(2, count);
}

[Fact]
Expand All @@ -104,16 +106,21 @@ public async Task Cancel_Ok()

var cts = new CancellationTokenSource();
cts.Cancel();
Assert.ThrowsAny<OperationCanceledException>(() => dispatcher.Throttle(async () =>
var ex = await Assert.ThrowsAsync<OperationCanceledException>(() =>
{
await Task.Delay(300);
}, cts.Token));
dispatcher.Throttle(() =>
{

}, cts.Token);
return Task.CompletedTask;
});
Assert.NotNull(ex);

cts = new CancellationTokenSource(100);
await Assert.ThrowsAnyAsync<OperationCanceledException>(() => dispatcher.ThrottleAsync(async () =>
await dispatcher.ThrottleAsync(async () =>
{
await Task.Delay(300);
}, cts.Token));
}, cts.Token);
}

[Fact]
Expand All @@ -125,38 +132,37 @@ public void Clear()
factory.Clear("Clear");
}

[Fact]
public void LatTask_Ok()
{
var dispatch = new MockDispatcher(new ThrottleOptions());
Assert.NotNull(dispatch.TestLastTask());
}

[Fact]
public void ShouldWait_Ok()
{
var dispatch = new MockDispatcher(new ThrottleOptions());
var dispatch = new ThrottleDispatcher(new ThrottleOptions());
var count = 0;
dispatch.Throttle(() => count++);
Assert.Equal(0, count);
Assert.Equal(1, count);
dispatch.Throttle(() => count++);
Assert.Equal(1, count);
}

class MockDispatcher(ThrottleOptions options) : ThrottleDispatcher(options)
[Fact]
public async Task MultipleThread_ThrottleAsync_Ok()
{
public Task TestLastTask()
var count = 0;
var dispatch = new ThrottleDispatcher(new ThrottleOptions()
{
return LastTask;
}

private int count = 0;

/// <summary>
/// <inheritdoc/>
/// </summary>
/// <returns></returns>
protected override bool ShouldWait()
Interval = TimeSpan.FromMilliseconds(100),
DelayAfterExecution = true
});
var tasks = Enumerable.Range(1, 2).Select(i => dispatch.ThrottleAsync(() =>
{
return count++ == 1;
}
count++;
return Task.CompletedTask;
})).ToList();
tasks.Add(dispatch.ThrottleAsync(async () =>
{
await Task.Delay(120);
count++;
}));
await Task.WhenAll(tasks);
Assert.Equal(1, count);
}
}
Loading