@@ -21,12 +21,14 @@ public sealed class MailFolderMonitor : IMailFolderMonitor
2121 {
2222 private Func < IMessageSummary , Task > _messageArrivalMethod ;
2323 private Func < IMessageSummary , Task > _messageDepartureMethod ;
24+ private Func < IMessageSummary , Task > _messageFlagsChangedMethod ;
2425
2526 private static readonly Task _completedTask = Task . CompletedTask ;
2627 private readonly object _cacheLock = new object ( ) ;
2728 private readonly IList < IMessageSummary > _messageCache = new List < IMessageSummary > ( ) ;
2829 private readonly ConcurrentQueue < IMessageSummary > _arrivalQueue = new ConcurrentQueue < IMessageSummary > ( ) ;
2930 private readonly ConcurrentQueue < IMessageSummary > _departureQueue = new ConcurrentQueue < IMessageSummary > ( ) ;
31+ private readonly ConcurrentQueue < IMessageSummary > _flagChangeQueue = new ConcurrentQueue < IMessageSummary > ( ) ;
3032 private CancellationTokenSource _arrival = new CancellationTokenSource ( ) ;
3133 private CancellationTokenSource _cancel ;
3234 private IImapClient _imapClient ;
@@ -56,6 +58,11 @@ public MailFolderMonitor(IImapReceiver imapReceiver, IOptions<FolderMonitorOptio
5658 _logger . Log < MailFolderMonitor > ( $ "{ _imapReceiver } message #{ m . UniqueId } departure processed.", LogLevel . Debug ) ;
5759 return _completedTask ;
5860 } ;
61+ _messageFlagsChangedMethod = ( m ) =>
62+ {
63+ _logger . Log < MailFolderMonitor > ( $ "{ _imapReceiver } message #{ m . UniqueId } flag change processed.", LogLevel . Debug ) ;
64+ return _completedTask ;
65+ } ;
5966 }
6067
6168 public static MailFolderMonitor Create ( FolderMonitorOptions folderMonitorOptions , ILogger < MailFolderMonitor > logger = null , ILogger < ImapReceiver > logImap = null , IProtocolLogger protocolLogger = null , IImapClient imapClient = null )
@@ -165,6 +172,12 @@ public IMailFolderMonitor OnMessageDeparture(Func<IMessageSummary, Task> message
165172 return this ;
166173 }
167174
175+ public IMailFolderMonitor OnMessageFlagsChanged ( Func < IMessageSummary , Task > messageFlagsChangedMethod )
176+ {
177+ _messageFlagsChangedMethod = messageFlagsChangedMethod ;
178+ return this ;
179+ }
180+
168181 public IMailFolderMonitor OnMessageArrival ( Action < IMessageSummary > messageArrivalMethod ) =>
169182 OnMessageArrival ( ( messageSummary ) =>
170183 {
@@ -179,6 +192,13 @@ public IMailFolderMonitor OnMessageDeparture(Action<IMessageSummary> messageDepa
179192 return _completedTask ;
180193 } ) ;
181194
195+ public IMailFolderMonitor OnMessageFlagsChanged ( Action < IMessageSummary > messageFlagsChangedMethod ) =>
196+ OnMessageFlagsChanged ( ( messageSummary ) =>
197+ {
198+ messageFlagsChangedMethod ( messageSummary ) ;
199+ return _completedTask ;
200+ } ) ;
201+
182202 public async Task IdleAsync ( CancellationToken cancellationToken = default )
183203 {
184204 _logger . Log < MailFolderMonitor > ( $ "{ _imapReceiver } monitoring requested.", LogLevel . Trace ) ;
@@ -188,14 +208,17 @@ public async Task IdleAsync(CancellationToken cancellationToken = default)
188208 var tasks = new Task [ ]
189209 {
190210 IdleStartAsync ( _cancel . Token ) . ContinueWith ( t =>
191- _logger . Log < MailFolderMonitor > ( t . Exception ? . InnerException ?? t . Exception ,
211+ _logger . Log < MailFolderMonitor > ( t . Exception ? . GetBaseException ( ) ,
192212 "Idle client failed." ) , TaskContinuationOptions . OnlyOnFaulted ) ,
193213 ProcessArrivalQueueAsync ( _messageArrivalMethod , _cancel . Token ) . ContinueWith ( t =>
194- _logger . Log < MailFolderMonitor > ( t . Exception ? . InnerException ?? t . Exception ,
214+ _logger . Log < MailFolderMonitor > ( t . Exception ? . GetBaseException ( ) ,
195215 "Arrival queue processing failed." ) , TaskContinuationOptions . OnlyOnFaulted ) ,
196216 ProcessDepartureQueueAsync ( _messageDepartureMethod , _cancel . Token ) . ContinueWith ( t =>
197- _logger . Log < MailFolderMonitor > ( t . Exception ? . InnerException ?? t . Exception ,
198- "Departure queue processing failed." ) , TaskContinuationOptions . OnlyOnFaulted )
217+ _logger . Log < MailFolderMonitor > ( t . Exception ? . GetBaseException ( ) ,
218+ "Departure queue processing failed." ) , TaskContinuationOptions . OnlyOnFaulted ) ,
219+ ProcessFlagChangeQueueAsync ( _messageFlagsChangedMethod , _cancel . Token ) . ContinueWith ( t =>
220+ _logger . Log < MailFolderMonitor > ( t . Exception ? . GetBaseException ( ) ,
221+ "Flag change queue processing failed." ) , TaskContinuationOptions . OnlyOnFaulted )
199222 } ;
200223 await Task . WhenAll ( tasks ) . ConfigureAwait ( false ) ;
201224 _logger . Log < MailFolderMonitor > ( $ "{ _imapReceiver } monitoring complete.", LogLevel . Information ) ;
@@ -228,6 +251,7 @@ private async Task IdleStartAsync(CancellationToken cancellationToken = default)
228251
229252 _mailFolder . CountChanged += OnCountChanged ;
230253 _mailFolder . MessageExpunged += OnMessageExpunged ;
254+ _mailFolder . MessageFlagsChanged += OnFlagsChanged ;
231255
232256 if ( _mailFolder . Count > 0 )
233257 await ProcessMessagesArrivedAsync ( true , cancellationToken ) . ConfigureAwait ( false ) ;
@@ -257,6 +281,7 @@ private void Disconnect(bool throwOnFirstException)
257281 _logger . Log < MailFolderMonitor > ( "Disconnecting IMAP idle client..." , LogLevel . Trace ) ;
258282 if ( _mailFolder != null )
259283 {
284+ _mailFolder . MessageFlagsChanged += OnFlagsChanged ;
260285 _mailFolder . MessageExpunged -= OnMessageExpunged ;
261286 _mailFolder . CountChanged -= OnCountChanged ;
262287 }
@@ -266,6 +291,7 @@ private void Disconnect(bool throwOnFirstException)
266291#if NET6_0_OR_GREATER
267292 _arrivalQueue ? . Clear ( ) ;
268293 _departureQueue ? . Clear ( ) ;
294+ _flagChangeQueue ? . Clear ( ) ;
269295#endif
270296 _arrival ? . Dispose ( ) ;
271297 _cancel ? . Dispose ( ) ;
@@ -454,9 +480,9 @@ private async Task ProcessArrivalQueueAsync(Func<IMessageSummary, Task> messageA
454480 }
455481 catch ( Exception ex )
456482 {
457- _logger . Log < MailFolderMonitor > ( ex , $ "Error occurred processing arrival queue item during attempt #{ retryCount } , backing off for { _folderMonitorOptions . EmptyQueueMaxDelayMs } ms. { _imapReceiver } #{ messageSummary . UniqueId } .", LogLevel . Warning ) ;
458483 if ( messageSummary != null )
459484 _arrivalQueue . Enqueue ( messageSummary ) ;
485+ _logger . Log < MailFolderMonitor > ( ex , $ "Error occurred processing arrival queue item during attempt #{ retryCount } , backing off for { _folderMonitorOptions . EmptyQueueMaxDelayMs } ms. { _imapReceiver } #{ messageSummary ? . UniqueId } .", LogLevel . Warning ) ;
460486 await Task . Delay ( _folderMonitorOptions . EmptyQueueMaxDelayMs , cancellationToken ) . ConfigureAwait ( false ) ;
461487 }
462488 }
@@ -488,9 +514,43 @@ private async Task ProcessDepartureQueueAsync(Func<IMessageSummary, Task> messag
488514 }
489515 catch ( Exception ex )
490516 {
491- _logger . Log < MailFolderMonitor > ( ex , $ "Error occurred processing departure queue item during attempt #{ retryCount } , backing off for { _folderMonitorOptions . EmptyQueueMaxDelayMs } ms. { _imapReceiver } #{ messageSummary . UniqueId } .", LogLevel . Warning ) ;
492517 if ( messageSummary != null )
493518 _departureQueue . Enqueue ( messageSummary ) ;
519+ _logger . Log < MailFolderMonitor > ( ex , $ "Error occurred processing departure queue item during attempt #{ retryCount } , backing off for { _folderMonitorOptions . EmptyQueueMaxDelayMs } ms. { _imapReceiver } #{ messageSummary ? . UniqueId } .", LogLevel . Warning ) ;
520+ await Task . Delay ( _folderMonitorOptions . EmptyQueueMaxDelayMs , cancellationToken ) . ConfigureAwait ( false ) ;
521+ }
522+ }
523+ while ( ! cancellationToken . IsCancellationRequested && retryCount < _folderMonitorOptions . MaxRetries ) ;
524+ }
525+ }
526+
527+ private async Task ProcessFlagChangeQueueAsync ( Func < IMessageSummary , Task > messageFlagChangedMethod , CancellationToken cancellationToken = default )
528+ {
529+ int retryCount = 0 ;
530+ if ( messageFlagChangedMethod != null )
531+ {
532+ IMessageSummary messageSummary = null ;
533+ do
534+ {
535+ retryCount ++ ;
536+ try
537+ {
538+ if ( _flagChangeQueue . TryDequeue ( out messageSummary ) )
539+ await messageFlagChangedMethod ( messageSummary ) . ConfigureAwait ( false ) ;
540+ else if ( _flagChangeQueue . IsEmpty )
541+ await Task . Delay ( _folderMonitorOptions . EmptyQueueMaxDelayMs , cancellationToken ) . ConfigureAwait ( false ) ;
542+ retryCount = 0 ;
543+ }
544+ catch ( OperationCanceledException )
545+ {
546+ _logger . Log < MailFolderMonitor > ( "Flag change queue cancelled." , LogLevel . Trace ) ;
547+ break ;
548+ }
549+ catch ( Exception ex )
550+ {
551+ if ( messageSummary != null )
552+ _flagChangeQueue . Enqueue ( messageSummary ) ;
553+ _logger . Log < MailFolderMonitor > ( ex , $ "Error occurred processing flag change queue item during attempt #{ retryCount } , backing off for { _folderMonitorOptions . EmptyQueueMaxDelayMs } ms. { _imapReceiver } #{ messageSummary ? . UniqueId } .", LogLevel . Warning ) ;
494554 await Task . Delay ( _folderMonitorOptions . EmptyQueueMaxDelayMs , cancellationToken ) . ConfigureAwait ( false ) ;
495555 }
496556 }
@@ -527,27 +587,57 @@ private void OnCountChanged(object sender, EventArgs e)
527587 }
528588 }
529589
590+ /// <summary>
591+ /// Keep the message cache in sync with the <see cref="ImapFolder">mail folder</see> by adding items.
592+ /// </summary>
593+ private void OnFlagsChanged ( object sender , MessageFlagsChangedEventArgs e )
594+ {
595+ int index ;
596+ int cachedCount ;
597+ IMessageSummary messageSummary = null ;
598+ lock ( _cacheLock )
599+ {
600+ index = e . Index ;
601+ cachedCount = _messageCache . Count ;
602+ if ( index < cachedCount )
603+ {
604+ messageSummary = _messageCache [ index ] ;
605+ _flagChangeQueue . Enqueue ( messageSummary ) ;
606+ }
607+ }
608+ using ( _logger . BeginScope ( "OnFlagsChanged" ) )
609+ {
610+ if ( messageSummary != null )
611+ _logger . Log < MailFolderMonitor > ( $ "{ _imapReceiver } [{ index } ] flags have changed ({ e . Flags } ), item #{ messageSummary . UniqueId } .", LogLevel . Trace ) ;
612+ else
613+ _logger . Log < MailFolderMonitor > ( $ "{ _imapReceiver } [{ index } ] message flag change (count={ cachedCount } ) was out of range.", LogLevel . Warning ) ;
614+ }
615+ }
616+
530617 /// <summary>
531618 /// Keep the message cache in sync with the <see cref="ImapFolder">mail folder</see> by removing items.
532619 /// </summary>
533620 /// <exception cref="ArgumentOutOfRangeException">Collection index is invalid.</exception>
534621 private void OnMessageExpunged ( object sender , MessageEventArgs e )
535622 {
536- using ( _logger . BeginScope ( "OnMessageExpunged" ) )
623+ int index ;
624+ int cachedCount ;
625+ IMessageSummary messageSummary = null ;
626+ lock ( _cacheLock )
537627 {
538- int index = e . Index ;
539- var cachedCount = _messageCache . Count ;
628+ index = e . Index ;
629+ cachedCount = _messageCache . Count ;
540630 if ( index < cachedCount )
541631 {
542- IMessageSummary messageSummary ;
543- lock ( _cacheLock )
544- {
545- messageSummary = _messageCache [ index ] ;
546- _messageCache . RemoveAt ( index ) ;
547- }
632+ messageSummary = _messageCache [ index ] ;
633+ _messageCache . RemoveAt ( index ) ;
548634 _departureQueue . Enqueue ( messageSummary ) ;
549- _logger . Log < MailFolderMonitor > ( $ "{ _imapReceiver } [{ index } ] (count={ cachedCount } ) expunged, item #{ messageSummary . UniqueId } .", LogLevel . Trace ) ;
550635 }
636+ }
637+ using ( _logger . BeginScope ( "OnMessageExpunged" ) )
638+ {
639+ if ( messageSummary != null )
640+ _logger . Log < MailFolderMonitor > ( $ "{ _imapReceiver } [{ index } ] (count={ cachedCount } ) expunged, item #{ messageSummary . UniqueId } .", LogLevel . Trace ) ;
551641 else
552642 _logger . Log < MailFolderMonitor > ( $ "{ _imapReceiver } [{ index } ] (count={ cachedCount } ) was out of range.", LogLevel . Warning ) ;
553643 }
0 commit comments