22using System . Collections . Concurrent ;
33using System . Threading ;
44using System . Threading . Tasks ;
5+ using DisposableComponents ;
6+ using Microsoft . Extensions . Logging ;
57using Reactive . Bindings . Extensions ;
68
79namespace SimpleVolumeMixer . Core . Helper . Component ;
@@ -13,39 +15,28 @@ namespace SimpleVolumeMixer.Core.Helper.Component;
1315/// <typeparam name="TR">Type of object returned as the return value of Func</typeparam>
1416public class QueueProcessor < TP , TR > : DisposableComponent
1517{
18+ private readonly string _name ;
19+ private readonly ILogger _logger ;
1620 private readonly BlockingCollection < QueueProcessorItem < TP , TR > > _items ;
1721 private readonly Task _loopTask ;
1822 private readonly CancellationTokenSource _cancellation ;
1923
2024 /// <summary>
2125 /// ctor
2226 /// </summary>
27+ /// <param name="name"></param>
28+ /// <param name="logger"></param>
2329 /// <param name="capacity">Maximum number of queues registered</param>
24- public QueueProcessor ( int capacity = 4096 )
30+ public QueueProcessor ( string name , ILogger logger , int capacity = 4096 )
2531 {
32+ _name = name ;
33+ _logger = logger ;
2634 _items = new BlockingCollection < QueueProcessorItem < TP , TR > > ( capacity ) . AddTo ( Disposable ) ;
27- _loopTask = new Task ( ( ) => DoProcess ( ) ) . AddTo ( Disposable ) ;
2835 _cancellation = new CancellationTokenSource ( ) . AddTo ( Disposable ) ;
29- }
30-
31- /// <summary>
32- /// Starts a worker thread that executes the Func registered in the Queue.
33- /// The thread that invokes this method will not be blocked.
34- /// </summary>
35- public void StartRequest ( )
36- {
36+ _loopTask = new Task ( ( ) => DoProcess ( ) , _cancellation . Token ) . AddTo ( Disposable ) ;
3737 _loopTask . Start ( ) ;
3838 }
3939
40- /// <summary>
41- /// Sends a stop request to the worker thread.
42- /// Note that this method call does not necessarily stop the worker thread immediately.
43- /// </summary>
44- public void StopRequest ( )
45- {
46- _cancellation . Cancel ( false ) ;
47- }
48-
4940 /// <summary>
5041 /// Register Func (and its auxiliary objects) in the queue.
5142 /// When the number of registrations in the queue reaches the capacity set in the constructor,
@@ -54,6 +45,11 @@ public void StopRequest()
5445 /// <param name="item"></param>
5546 public void Push ( QueueProcessorItem < TP , TR > item )
5647 {
48+ if ( IsDisposed )
49+ {
50+ return ;
51+ }
52+
5753 _items . Add ( item ) ;
5854 }
5955
@@ -62,24 +58,55 @@ public void Push(QueueProcessorItem<TP, TR> item)
6258 /// </summary>
6359 private void DoProcess ( )
6460 {
61+ _logger . LogInformation ( "[{}] start looping..." , _name ) ;
62+
6563 while ( ! _cancellation . IsCancellationRequested )
6664 {
67- var item = _items . Take ( ) ;
68- if ( item . CancelRequest )
65+ try
6966 {
67+ var item = _items . Take ( _cancellation . Token ) ;
68+ if ( item . CancelRequest )
69+ {
70+ item . Executed = true ;
71+ continue ;
72+ }
73+
74+ item . Result = item . Function ( item . Argument ) ;
7075 item . Executed = true ;
71- continue ;
7276 }
73-
74- item . Result = item . Function ( item . Argument ) ;
75- item . Executed = true ;
77+ catch ( OperationCanceledException ex )
78+ {
79+ _logger . LogError ( ex , "[{}] canceled" , _name ) ;
80+ }
7681 }
82+
83+ _logger . LogInformation ( "[{}] finish looping..." , _name ) ;
7784 }
7885
7986 /// <inheritdoc cref="DisposableComponent.OnDisposing"/>
8087 protected override void OnDisposing ( )
8188 {
82- StopRequest ( ) ;
89+ _logger . LogInformation ( "[{}] disposing..." , _name ) ;
90+
91+ _cancellation . Cancel ( false ) ;
92+
93+ switch ( _loopTask . Status )
94+ {
95+ case TaskStatus . Canceled :
96+ case TaskStatus . Faulted :
97+ case TaskStatus . RanToCompletion :
98+ break ;
99+ default :
100+ _loopTask . Wait ( ) ;
101+ break ;
102+ }
103+
83104 base . OnDisposing ( ) ;
84105 }
106+
107+ protected override void OnDisposed ( )
108+ {
109+ _logger . LogInformation ( "[{}] disposed..." , _name ) ;
110+ base . OnDisposed ( ) ;
111+ }
85112}
0 commit comments