22// Licensed under the Apache License, Version 2.0
33
44using System ;
5+ using System . Collections . Concurrent ;
56using System . Collections . Generic ;
67using System . ComponentModel ;
8+ using System . Linq ;
9+ using System . Threading ;
10+ using System . Threading . Tasks ;
11+ using Microsoft . Extensions . Logging ;
712using Moryx . AbstractionLayer ;
813using Moryx . AbstractionLayer . Capabilities ;
914using Moryx . AbstractionLayer . Resources ;
15+ using Moryx . ControlSystem . Activities ;
16+ using static System . Collections . Specialized . BitVector32 ;
1017
1118namespace Moryx . ControlSystem . Cells
1219{
@@ -16,21 +23,182 @@ namespace Moryx.ControlSystem.Cells
1623 [ Description ( "Base type for all cells within a production system" ) ]
1724 public abstract class Cell : Resource , ICell
1825 {
26+ private readonly ConcurrentDictionary < Session , TaskCompletionSource < Session > > _sessionCompletionSources =
27+ new ConcurrentDictionary < Session , TaskCompletionSource < Session > > ( new SessionComparer ( ) ) ;
28+
29+ /// <summary>
30+ /// CancellationTokenSource that must be canceled during <see cref="OnStop"/>.
31+ /// Used to cancel async operations during resource shutdown.
32+ /// </summary>
33+ protected readonly CancellationTokenSource LifeCycleTokenSource = new CancellationTokenSource ( ) ;
34+
35+
1936 /// <inheritdoc />
2037 public abstract IEnumerable < Session > ControlSystemAttached ( ) ;
2138
2239 /// <inheritdoc />
2340 public abstract IEnumerable < Session > ControlSystemDetached ( ) ;
2441
25- /// <inheritdoc />
42+ /// <summary>
43+ /// Callback to start an activity on the cell after a <see cref="ReadyToWork"/> event was raised.
44+ /// If not otherwise explicitly required all depending components will use the interface to refer to cells,
45+ /// making this the right place to interject and complete tasks
46+ /// started by async calls to <see cref="PublishReadyToWorkAsync(Moryx.ControlSystem.Cells.ReadyToWork)"/>.
47+ /// </summary>
48+ void ICell . StartActivity ( ActivityStart activityStart )
49+ {
50+ // check if session was started async
51+ if ( _sessionCompletionSources . TryRemove ( activityStart , out var completionSource ) )
52+ {
53+ // by setting the result PublishReadyToWorkAsync will be completed.
54+ if ( ! completionSource . TrySetResult ( activityStart ) )
55+ {
56+ Logger . Log ( LogLevel . Error , "Cannot set result of async request with session {sessionId}. [{sessionType}]" , activityStart . Id ,
57+ nameof ( ActivityStart ) ) ;
58+ }
59+ // session was started async: do NOT forward activity start to StartActivity()!
60+ return ;
61+ }
62+
63+ StartActivity ( activityStart ) ;
64+ }
65+
66+ /// <summary>
67+ /// Callback to start an activity on the cell after a <see cref="ReadyToWork"/> event was raised.
68+ /// </summary>
69+ /// <param name="activityStart"></param>
2670 public abstract void StartActivity ( ActivityStart activityStart ) ;
2771
28- /// <inheritdoc />
72+ /// <summary>
73+ /// Callback to abort a running activity on the cell after a <see cref="StartActivity(Moryx.ControlSystem.Cells.ActivityStart)"/> was received.
74+ /// Aborting might occur due to a abort of the related Job.
75+ /// If not otherwise explicitly required all depending components will use the interface to refer to cells,
76+ /// making this the right place to interject suppress ProcessAborting for pending async result calls started with <see cref="PublishActivityCompletedAsync(Moryx.ControlSystem.Cells.ActivityCompleted)"/> .
77+ /// </summary>
78+ void ICell . ProcessAborting ( IActivity affectedActivity )
79+ {
80+ var asyncResult = _sessionCompletionSources . SingleOrDefault ( pair =>
81+ pair . Key is ActivityCompleted completed &&
82+ completed . CompletedActivity . Id == affectedActivity . Id ) ;
83+ if ( asyncResult . Key != null )
84+ {
85+ Logger . Log ( LogLevel . Information , "ProcessAborting of activity {activityId} [{activityType}] was suppressed due to a pending async activity result. Session {sessionId}!" , affectedActivity . Id , affectedActivity . GetType ( ) . Name , asyncResult . Key . Id ) ;
86+ return ;
87+ }
88+ ProcessAborting ( affectedActivity ) ;
89+ }
90+
91+ /// <summary>
92+ /// Callback to abort a running activity on the cell after a <see cref="StartActivity(Moryx.ControlSystem.Cells.ActivityStart)"/> was received.
93+ /// Aborting might occur due to a abort of the related Job.
94+ /// </summary>
2995 public virtual void ProcessAborting ( IActivity affectedActivity ) { }
30-
31- /// <inheritdoc />
96+
97+ /// <summary>
98+ /// Callback to complete a sequence on the cell after a <see cref="ReadyToWork"/> or <see cref="ActivityCompleted"/> event was raised.
99+ /// If not otherwise explicitly required all depending components will use the interface to refer to cells,
100+ /// making this the right place to interject and complete tasks started by async calls
101+ /// to <see cref="PublishReadyToWorkAsync(Moryx.ControlSystem.Cells.ReadyToWork)"/> or <see cref="PublishActivityCompletedAsync(Moryx.ControlSystem.Cells.ActivityCompleted)"/> .
102+ /// </summary>
103+ void ICell . SequenceCompleted ( SequenceCompleted completed )
104+ {
105+ // check if session was started async
106+ if ( _sessionCompletionSources . TryRemove ( completed , out var completionSource ) )
107+ {
108+ // by setting the result the related async call (PublishReadyToWorkAsync or PublishActivityCompletedAsync) will be completed.
109+ if ( ! completionSource . TrySetResult ( completed ) )
110+ {
111+ Logger . Log ( LogLevel . Error , "Cannot set result of async request for session {sessionId}. [{sessionType}]" , completed . Id ,
112+ nameof ( SequenceCompleted ) ) ;
113+ }
114+ // session was started async: do NOT forward SequenceCompleted to SequenceCompleted()!
115+ return ;
116+ }
117+
118+ SequenceCompleted ( completed ) ;
119+ }
120+
121+ /// <summary>
122+ /// Callback to complete a sequence on the cell after a <see cref="ReadyToWork"/> or <see cref="ActivityCompleted"/> event was raised.
123+ /// </summary>
32124 public abstract void SequenceCompleted ( SequenceCompleted completed ) ;
33-
125+
126+ /// <inheritdoc />
127+ protected override void OnStop ( )
128+ {
129+ LifeCycleTokenSource . Cancel ( ) ;
130+ LifeCycleTokenSource . Dispose ( ) ;
131+ base . OnStop ( ) ;
132+ }
133+
134+ /// <summary>
135+ /// Publish a <see cref="ReadyToWork"/> from the resource.
136+ /// Returns the <see cref="ActivityStart"/> or <see cref="Moryx.ControlSystem.Cells.SequenceCompleted"/> when returned.
137+ /// </summary>
138+ /// <exception cref="OperationCanceledException">
139+ /// Operation might be canceled due to Lifecycle-CancellationToken,
140+ /// Personal-CancellationToken, request was not able to execute or
141+ /// <see cref="PublishNotReadyToWork"/> was called for the related session.
142+ /// </exception>
143+ public Task < Session > PublishReadyToWorkAsync ( ReadyToWork readyToWork )
144+ {
145+ return PublishReadyToWorkAsync ( readyToWork , CancellationToken . None ) ;
146+ }
147+
148+ /// <summary>
149+ /// Publish a <see cref="ReadyToWork"/> from the resource.
150+ /// Returns the <see cref="ActivityStart"/> or <see cref="Moryx.ControlSystem.Cells.SequenceCompleted"/> when returned.
151+ /// </summary>
152+ /// <exception cref="OperationCanceledException">
153+ /// Operation might be canceled due to Lifecycle-CancellationToken,
154+ /// Personal-CancellationToken, request was not able to execute or
155+ /// <see cref="PublishNotReadyToWork"/> was called for the related session.
156+ /// </exception>
157+ public Task < Session > PublishReadyToWorkAsync ( ReadyToWork readyToWork , CancellationToken cancellationToken )
158+ {
159+ Logger . Log ( LogLevel . Trace , "PublishReadyToWorkAsync Session {sessionId} Type {rtwType}, Classification {classification}, {reference}" , readyToWork . Id ,
160+ readyToWork . ReadyToWorkType , readyToWork . AcceptedClassification , readyToWork . Reference ) ;
161+ using var linkedTokenSource = CancellationTokenSource . CreateLinkedTokenSource ( LifeCycleTokenSource . Token , cancellationToken ) ;
162+ var linkedToken = linkedTokenSource . Token ;
163+
164+ var completionSource = new TaskCompletionSource < Session > ( ) ;
165+ linkedToken . Register ( ( ) => completionSource . TrySetCanceled ( ) ) ;
166+ // throw exception if cancellation via token was requested
167+ linkedToken . ThrowIfCancellationRequested ( ) ;
168+
169+ // check event to be wired
170+ if ( ReadyToWork == null )
171+ {
172+ Logger . Log ( LogLevel . Error , "PublishReadyToWorkAsync for session {sessionId} canceled! ReadyToWork-Event not wired. Make sure to await ControlSystemAttached before starting any sessions!" , readyToWork . Id ) ;
173+ completionSource . TrySetCanceled ( ) ;
174+ return completionSource . Task ;
175+ }
176+
177+ if ( ! _sessionCompletionSources . TryAdd ( readyToWork , completionSource ) )
178+ {
179+ Logger . Log ( LogLevel . Error , "There is already a running async operation for Session {sessionId}! Cancel the current request!" , readyToWork . Id ) ;
180+ completionSource . TrySetCanceled ( ) ;
181+ return completionSource . Task ;
182+ }
183+
184+ ReadyToWork . Invoke ( this , readyToWork ) ;
185+
186+ Session PublishNotReadyToWorkIfCanceled ( Task < Session > task )
187+ {
188+ if ( task . IsCanceled )
189+ {
190+ Logger . Log ( LogLevel . Information , "PublishReadyToWorkAsync canceled! Session {sessionId} Publish NotReadyToWork" , readyToWork . Id ) ;
191+ // NotReadyToWork must be wired because we raised ReadyToWork before!
192+ NotReadyToWork ! . Invoke ( this , readyToWork . PauseSession ( ) ) ;
193+ }
194+
195+ return task . Result ;
196+ }
197+
198+ // now waiting for StartActivity() or SequenceCompleted()
199+ return completionSource . Task . ContinueWith ( PublishNotReadyToWorkIfCanceled ) ;
200+ }
201+
34202 /// <summary>
35203 /// Publish a <see cref="ReadyToWork"/> from the resource
36204 /// </summary>
@@ -43,26 +211,96 @@ public void PublishReadyToWork(ReadyToWork readyToWork)
43211 public event EventHandler < ReadyToWork > ReadyToWork ;
44212
45213 /// <summary>
46- /// Publish a <see cref="NotReadyToWork"/> from the resource
214+ /// Publish a <see cref="NotReadyToWork"/> from the resource.
215+ /// If the session was started within <see cref="PublishReadyToWorkAsync(Moryx.ControlSystem.Cells.ReadyToWork)"/> that async call is canceled
47216 /// </summary>
48217 public void PublishNotReadyToWork ( NotReadyToWork notReadyToWork )
49218 {
219+ if ( _sessionCompletionSources . TryRemove ( notReadyToWork , out var completionSource ) )
220+ {
221+ completionSource . TrySetCanceled ( ) ;
222+ // cancellation of the completionSource will send a NotReadyToWork
223+ return ;
224+ }
50225 NotReadyToWork ? . Invoke ( this , notReadyToWork ) ;
51226 }
52227
53228 /// <inheritdoc />
54229 public event EventHandler < NotReadyToWork > NotReadyToWork ;
55230
231+ /// <summary>
232+ /// Publish <see cref="ActivityCompleted"/> from the resource
233+ /// Returns the <see cref="Moryx.ControlSystem.Cells.SequenceCompleted"/> when returned.
234+ /// </summary>
235+ /// <exception cref="OperationCanceledException">
236+ /// Operation might be canceled due to Lifecycle-CancellationToken,
237+ /// Personal-CancellationToken or request was not able to execute.
238+ /// </exception>
239+ public Task < Session > PublishActivityCompletedAsync ( ActivityCompleted activityResult )
240+ {
241+ return PublishActivityCompletedAsync ( activityResult , CancellationToken . None ) ;
242+ }
243+
244+ /// <summary>
245+ /// Publish <see cref="ActivityCompleted"/> from the resource
246+ /// Returns the <see cref="Moryx.ControlSystem.Cells.SequenceCompleted"/> when returned.
247+ /// </summary>
248+ /// <exception cref="OperationCanceledException">
249+ /// Operation might be canceled due to Lifecycle-CancellationToken,
250+ /// Personal-CancellationToken or request was not able to execute.
251+ /// </exception>
252+ public Task < Session > PublishActivityCompletedAsync ( ActivityCompleted activityResult , CancellationToken cancellationToken )
253+ {
254+ Logger . Log ( LogLevel . Trace , "PublishActivityCompletedAsync Session {sessionId}, Classification {classification}, {reference}" , activityResult . Id ,
255+ activityResult . AcceptedClassification , activityResult . Reference ) ;
256+ using var linkedTokenSource = CancellationTokenSource . CreateLinkedTokenSource ( LifeCycleTokenSource . Token , cancellationToken ) ;
257+ var linkedToken = linkedTokenSource . Token ;
258+
259+ var completionSource = new TaskCompletionSource < Session > ( ) ;
260+ linkedToken . Register ( ( ) => completionSource . TrySetCanceled ( ) ) ;
261+ // throw exception if cancellation via token was requested
262+ linkedToken . ThrowIfCancellationRequested ( ) ;
263+
264+ if ( _sessionCompletionSources . TryAdd ( activityResult , completionSource ) )
265+ {
266+ Logger . Log ( LogLevel . Warning , "There is already a running async operation for Session {sessionId}! Cancel current request!" , activityResult . Id ) ;
267+ completionSource . TrySetCanceled ( ) ;
268+ return completionSource . Task ;
269+ }
270+ // ActivityCompleted must be wired because we received the ActivityStart before!
271+ ActivityCompleted ! . Invoke ( this , activityResult ) ;
272+ return completionSource . Task ;
273+ }
274+
56275 /// <summary>
57276 /// Publish <see cref="ActivityCompleted"/> from the resource
58277 /// </summary>
59- /// <param name="activityResult"></param>
60278 public void PublishActivityCompleted ( ActivityCompleted activityResult )
61279 {
62280 ActivityCompleted ? . Invoke ( this , activityResult ) ;
63281 }
64282
65283 /// <inheritdoc />
66284 public event EventHandler < ActivityCompleted > ActivityCompleted ;
285+
286+ /// <summary>
287+ /// IEqualityComparer to compare different sessions in a dictionary by their Id.
288+ /// </summary>
289+ private class SessionComparer : IEqualityComparer < Session >
290+ {
291+ /// <inheritdoc />
292+ public bool Equals ( Session x , Session y )
293+ {
294+ if ( x == null || y == null )
295+ return false ;
296+ return x . Id . Equals ( y . Id ) ;
297+ }
298+
299+ /// <inheritdoc />
300+ public int GetHashCode ( Session obj )
301+ {
302+ return obj . Id . GetHashCode ( ) ;
303+ }
304+ }
67305 }
68306}
0 commit comments