@@ -19,6 +19,8 @@ public interface ISloContext
1919
2020public abstract class SloTableContext < T > : ISloContext
2121{
22+ private const int IntervalMs = 100 ;
23+
2224 protected static readonly ILogger Logger = ISloContext . Factory . CreateLogger < SloTableContext < T > > ( ) ;
2325
2426 private volatile int _maxId ;
@@ -95,11 +97,13 @@ public async Task Run(RunConfig runConfig)
9597
9698 var writeLimiter = new FixedWindowRateLimiter ( new FixedWindowRateLimiterOptions
9799 {
98- Window = TimeSpan . FromMilliseconds ( 100 ) , PermitLimit = runConfig . WriteRps / 10 , QueueLimit = int . MaxValue
100+ Window = TimeSpan . FromMilliseconds ( IntervalMs ) , PermitLimit = runConfig . WriteRps / 10 ,
101+ QueueLimit = int . MaxValue
99102 } ) ;
100103 var readLimiter = new FixedWindowRateLimiter ( new FixedWindowRateLimiterOptions
101104 {
102- Window = TimeSpan . FromMilliseconds ( 100 ) , PermitLimit = runConfig . ReadRps / 10 , QueueLimit = int . MaxValue
105+ Window = TimeSpan . FromMilliseconds ( IntervalMs ) , PermitLimit = runConfig . ReadRps / 10 ,
106+ QueueLimit = int . MaxValue
103107 } ) ;
104108
105109 var cancellationTokenSource = new CancellationTokenSource ( ) ;
@@ -124,7 +128,7 @@ public async Task Run(RunConfig runConfig)
124128 Logger . LogInformation ( "Run task is finished" ) ;
125129 return ;
126130
127- Task ShootingTask ( RateLimiter rateLimitPolicy , string operationType ,
131+ async Task ShootingTask ( RateLimiter rateLimitPolicy , string operationType ,
128132 Func < T , RunConfig , Task < ( int , StatusCode ) > > action )
129133 {
130134 var metricFactory = Metrics . WithLabels ( new Dictionary < string , string >
@@ -193,21 +197,22 @@ Task ShootingTask(RateLimiter rateLimitPolicy, string operationType,
193197 [ "error_type" ]
194198 ) ;
195199
196- // ReSharper disable once MethodSupportsCancellation
197- return Task . Run ( async ( ) =>
200+ var workJobs = new List < Task > ( ) ;
201+
202+ for ( var i = 0 ; i < 10 ; i ++ )
198203 {
199- while ( ! cancellationTokenSource . Token . IsCancellationRequested )
204+ workJobs . Add ( Task . Run ( async ( ) =>
200205 {
201- using var lease = await rateLimitPolicy
202- . AcquireAsync ( cancellationToken : cancellationTokenSource . Token ) ;
203-
204- if ( ! lease . IsAcquired )
206+ while ( ! cancellationTokenSource . Token . IsCancellationRequested )
205207 {
206- continue ;
207- }
208+ using var lease = await rateLimitPolicy
209+ . AcquireAsync ( cancellationToken : cancellationTokenSource . Token ) ;
210+
211+ if ( ! lease . IsAcquired )
212+ {
213+ await Task . Delay ( Random . Shared . Next ( IntervalMs / 2 ) , cancellationTokenSource . Token ) ;
214+ }
208215
209- _ = Task . Run ( async ( ) =>
210- {
211216 try
212217 {
213218 pendingOperations . Inc ( ) ;
@@ -235,11 +240,14 @@ Task ShootingTask(RateLimiter rateLimitPolicy, string operationType,
235240 {
236241 Logger . LogError ( e , "Fail operation!" ) ;
237242 }
238- } , cancellationTokenSource . Token ) ;
239- }
243+ }
244+ } , cancellationTokenSource . Token ) ) ;
245+ }
246+
247+ // ReSharper disable once MethodSupportsCancellation
248+ await Task . WhenAll ( workJobs ) ;
240249
241- Logger . LogInformation ( "{ShootingName} shooting is stopped" , operationType ) ;
242- } ) ;
250+ Logger . LogInformation ( "{ShootingName} shooting is stopped" , operationType ) ;
243251 }
244252 }
245253
0 commit comments