1+ using System ;
2+ using System . Collections . Concurrent ;
3+ using System . Threading ;
4+ using System . Threading . Tasks ;
5+
6+ namespace ZeroCode
7+ {
8+ /// <summary>
9+ /// Helper class for invoking methods in concurrent mode. It may help to queue requests to database on application
10+ /// layer instead of database layer.
11+ /// </summary>
12+ public class ConcurrentExecutor
13+ {
14+ /// <summary>
15+ /// Synchronization primitive that will be control concurrent methods invoke
16+ /// </summary>
17+ private readonly SemaphoreSlim _semaphore ;
18+
19+ /// <summary>
20+ /// Count of maximum tasks of invoking methods that can be executed in meantime
21+ /// </summary>
22+ private int _maxParallelTasks ;
23+
24+ public ConcurrentExecutor ( int maxParallelTasks )
25+ {
26+ if ( maxParallelTasks <= 0 )
27+ throw new ArgumentOutOfRangeException ( nameof ( maxParallelTasks ) ,
28+ "Value of maximum parallel executing tasks must be more then 0" ) ;
29+
30+ _maxParallelTasks = maxParallelTasks ;
31+ _semaphore = new SemaphoreSlim ( _maxParallelTasks ) ;
32+ }
33+
34+ /// <summary>
35+ /// Change count of maximum methods that will be invoking in meantime. This method can be executing too long when
36+ /// lowering count of maximum parallel tasks.
37+ /// </summary>
38+ /// <param name="maxParallelTasks"></param>
39+ /// <returns></returns>
40+ /// <exception cref="ArgumentOutOfRangeException"></exception>
41+ public ConcurrentExecutor ChangeMaxParallelTaskCount ( int maxParallelTasks )
42+ {
43+ if ( maxParallelTasks <= 0 )
44+ throw new ArgumentOutOfRangeException ( nameof ( maxParallelTasks ) ,
45+ "Value of maximum parallel executing tasks must be more then 0" ) ;
46+
47+ var oldValue = Interlocked . Exchange ( ref _maxParallelTasks , maxParallelTasks ) ;
48+ var diff = maxParallelTasks - oldValue ;
49+
50+ // If value of max parallel now more than previous, we must release new slots in semaphore
51+ if ( diff > 0 ) _semaphore . Release ( diff ) ;
52+
53+ // If value of max parallel tasks now less than previous, we must use semaphore slots
54+ if ( diff < 0 )
55+ for ( var i = 0 ; i < Math . Abs ( diff ) ; i ++ )
56+ _semaphore . Wait ( ) ;
57+
58+ return this ;
59+ }
60+
61+ /// <summary>
62+ /// Execute in concurrent mode <paramref name="wrapped" />
63+ /// </summary>
64+ /// <typeparam name="TResult">Any result type</typeparam>
65+ /// <param name="wrapped">Method that must be invoked in concurrent mode</param>
66+ /// <param name="timeout">Timeout of waiting invoking in concurrent queue</param>
67+ /// <param name="token"></param>
68+ /// <returns></returns>
69+ /// <exception cref="ArgumentNullException"></exception>
70+ /// <exception cref="TimeoutException">If execution was timed out</exception>
71+ public TResult Execute < TResult > ( Func < TResult > wrapped , TimeSpan timeout = default ,
72+ CancellationToken token = default )
73+ {
74+ if ( wrapped == null ) throw new ArgumentNullException ( nameof ( wrapped ) ) ;
75+
76+ try
77+ {
78+ if ( timeout == TimeSpan . Zero )
79+ _semaphore . Wait ( token ) ;
80+ else if ( ! _semaphore . Wait ( timeout , token ) ) throw new TimeoutException ( ) ;
81+
82+ return wrapped ( ) ;
83+ }
84+ finally
85+ {
86+ _semaphore . Release ( ) ;
87+ }
88+ }
89+
90+ /// <inheritdoc cref="Execute{TResult}" />
91+ public void Execute ( Action wrapped , TimeSpan timeout = default ,
92+ CancellationToken token = default )
93+ {
94+ if ( wrapped == null ) throw new ArgumentNullException ( nameof ( wrapped ) ) ;
95+
96+ try
97+ {
98+ if ( timeout == TimeSpan . Zero )
99+ _semaphore . Wait ( token ) ;
100+ else if ( ! _semaphore . Wait ( timeout , token ) ) throw new TimeoutException ( ) ;
101+
102+ wrapped ( ) ;
103+ }
104+ finally
105+ {
106+ _semaphore . Release ( ) ;
107+ }
108+ }
109+
110+ /// <inheritdoc cref="Execute{TResult}" />
111+ public async Task < TResult > ExecuteAsync < TResult > ( Func < Task < TResult > > wrapped , TimeSpan timeout = default ,
112+ CancellationToken token = default )
113+ {
114+ if ( wrapped == null ) throw new ArgumentNullException ( nameof ( wrapped ) ) ;
115+
116+ try
117+ {
118+ if ( timeout == TimeSpan . Zero )
119+ await _semaphore . WaitAsync ( token ) ;
120+ else if ( ! await _semaphore . WaitAsync ( timeout , token ) ) throw new TimeoutException ( ) ;
121+
122+ return await wrapped ( ) ;
123+ }
124+ finally
125+ {
126+ _semaphore . Release ( ) ;
127+ }
128+ }
129+
130+ /// <inheritdoc cref="Execute{TResult}" />
131+ public async Task ExecuteAsync ( Func < Task > wrapped , TimeSpan timeout = default ,
132+ CancellationToken token = default )
133+ {
134+ if ( wrapped == null ) throw new ArgumentNullException ( nameof ( wrapped ) ) ;
135+
136+ try
137+ {
138+ if ( timeout == TimeSpan . Zero )
139+ await _semaphore . WaitAsync ( token ) ;
140+ else if ( ! await _semaphore . WaitAsync ( timeout , token ) ) throw new TimeoutException ( ) ;
141+
142+ await wrapped ( ) ;
143+ }
144+ finally
145+ {
146+ _semaphore . Release ( ) ;
147+ }
148+ }
149+ }
150+
151+ /// <summary>
152+ /// Static cache of <see cref="ConcurrentExecutor" />
153+ /// </summary>
154+ public static class ConcurrentExecutorCache
155+ {
156+ /// <summary>
157+ /// Cache of all created named executors
158+ /// </summary>
159+ private static readonly ConcurrentDictionary < string , ConcurrentExecutor > Executors =
160+ new ConcurrentDictionary < string , ConcurrentExecutor > ( ) ;
161+
162+ /// <summary>
163+ /// Returns existing or created new named executor
164+ /// </summary>
165+ /// <param name="name"></param>
166+ /// <param name="maxParallelTasks"></param>
167+ /// <returns></returns>
168+ /// <exception cref="ArgumentNullException"></exception>
169+ public static ConcurrentExecutor GetOrCreate ( string name , int maxParallelTasks )
170+ {
171+ if ( name == null ) throw new ArgumentNullException ( nameof ( name ) ) ;
172+
173+ return Executors . GetOrAdd ( name , new ConcurrentExecutor ( maxParallelTasks ) ) ;
174+ }
175+
176+ /// <summary>
177+ /// Clear cache of named executors
178+ /// </summary>
179+ public static void ClearExecutors ( )
180+ {
181+ Executors . Clear ( ) ;
182+ }
183+ }
184+ }
0 commit comments